You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/07 22:30:22 UTC

svn commit: r441210 - in /lucene/hadoop/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/compress/

Author: cutting
Date: Thu Sep  7 13:30:21 2006
New Revision: 441210

URL: http://svn.apache.org/viewvc?view=rev&rev=441210
Log:
HADOOP-441.  Add a compression codec API and extend SequenceFile to use it.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep  7 13:30:21 2006
@@ -148,6 +148,10 @@
     advertised by datanodes and tasktrackers.
     (Lorenzo Thione via cutting)
 
+37. HADOOP-441.  Add a compression codec API and extend SequenceFile
+    to use it.  This will permit the use of alternate compression
+    codecs in SequenceFile.  (Arun C Murthy via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Thu Sep  7 13:30:21 2006
@@ -41,6 +41,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Progressable;
 
 /**
  * This program uses map/reduce to just run a distributed job where there is
@@ -189,7 +190,7 @@
                                 jobConf, file,
                                 IntWritable.class, IntWritable.class,
                                 CompressionType.NONE,
-                                null);
+                                (Progressable)null);
       writer.append(new IntWritable(0), new IntWritable(filesPerMap));
       writer.close();
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Sep  7 13:30:21 2006
@@ -18,15 +18,19 @@
 
 import java.io.*;
 import java.util.*;
-import java.util.zip.*;
 import java.net.InetAddress;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import org.apache.lucene.util.PriorityQueue;
 import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** Support for flat files of binary key/value pairs. */
 public class SequenceFile {
@@ -36,8 +40,9 @@
   private SequenceFile() {}                         // no public ctor
 
   private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
+  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
   private static byte[] VERSION = new byte[] {
-    (byte)'S', (byte)'E', (byte)'Q', BLOCK_COMPRESS_VERSION
+    (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION
   };
 
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
@@ -79,9 +84,11 @@
     if (compressionType == CompressionType.NONE) {
       writer = new Writer(fs, conf, name, keyClass, valClass);
     } else if (compressionType == CompressionType.RECORD) {
-      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass);
+      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, 
+          new DefaultCodec());
     } else if (compressionType == CompressionType.BLOCK){
-      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass);
+      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, 
+          new DefaultCodec());
     }
     
     return writer;
@@ -106,13 +113,78 @@
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
+      writer = new Writer(fs, conf, name, keyClass, valClass, progress); 
+    } else if (compressionType == CompressionType.RECORD) {
+      writer = new RecordCompressWriter(fs, conf, name, 
+          keyClass, valClass, new DefaultCodec(), progress);
+    } else if (compressionType == CompressionType.BLOCK){
+      writer = new BlockCompressWriter(fs, conf, name, 
+          keyClass, valClass, new DefaultCodec(), progress);
+    }
+    
+    return writer;
+  }
+
+  /**
+   * Construct the preferred type of SequenceFile Writer.
+   * @param fs The configured filesystem. 
+   * @param conf The configuration.
+   * @param name The name of the file. 
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param codec The compression codec.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer 
+  createWriter(FileSystem fs, Configuration conf, Path name, 
+      Class keyClass, Class valClass, 
+      CompressionType compressionType, CompressionCodec codec) 
+  throws IOException {
+    Writer writer = null;
+    
+    if (compressionType == CompressionType.NONE) {
+      writer = new Writer(fs, conf, name, keyClass, valClass); 
+    } else if (compressionType == CompressionType.RECORD) {
+      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, 
+          codec);
+    } else if (compressionType == CompressionType.BLOCK){
+      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, 
+          codec);
+    }
+    
+    return writer;
+  }
+  
+  /**
+   * Construct the preferred type of SequenceFile Writer.
+   * @param fs The configured filesystem. 
+   * @param conf The configuration.
+   * @param name The name of the file. 
+   * @param keyClass The 'key' type.
+   * @param valClass The 'value' type.
+   * @param compressionType The compression type.
+   * @param codec The compression codec.
+   * @param progress The Progressable object to track progress.
+   * @return Returns the handle to the constructed SequenceFile Writer.
+   * @throws IOException
+   */
+  public static Writer
+  createWriter(FileSystem fs, Configuration conf, Path name, 
+      Class keyClass, Class valClass, 
+      CompressionType compressionType, CompressionCodec codec,
+      Progressable progress) throws IOException {
+    Writer writer = null;
+    
+    if (compressionType == CompressionType.NONE) {
       writer = new Writer(fs, conf, name, keyClass, valClass, progress);
     } else if (compressionType == CompressionType.RECORD) {
       writer = new RecordCompressWriter(fs, conf, name, 
-          keyClass, valClass, progress);
+          keyClass, valClass, codec, progress);
     } else if (compressionType == CompressionType.BLOCK){
       writer = new BlockCompressWriter(fs, conf, name, 
-          keyClass, valClass, progress);
+          keyClass, valClass, codec, progress);
     }
     
     return writer;
@@ -130,16 +202,17 @@
    */
   private static Writer
   createWriter(FSDataOutputStream out, 
-      Class keyClass, Class valClass, boolean compress, boolean blockCompress)
+      Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+      CompressionCodec codec)
   throws IOException {
     Writer writer = null;
 
     if (!compress) {
       writer = new Writer(out, keyClass, valClass);
     } else if (compress && !blockCompress) {
-      writer = new RecordCompressWriter(out, keyClass, valClass);
+      writer = new RecordCompressWriter(out, keyClass, valClass, codec);
     } else {
-      writer = new BlockCompressWriter(out, keyClass, valClass);
+      writer = new BlockCompressWriter(out, keyClass, valClass, codec);
     }
     
     return writer;
@@ -200,11 +273,14 @@
   private static class CompressedBytes implements ValueBytes {
     private int dataSize;
     private byte[] data;
-    private Inflater zlibInflater = null;
+    DataInputBuffer rawData = null;
+    CompressionCodec codec = null;
+    CompressionInputStream decompressedStream = null;
 
-    private CompressedBytes() {
+    private CompressedBytes(CompressionCodec codec) {
       data = null;
       dataSize = 0;
+      this.codec = codec;
     }
 
     private void reset(DataInputStream in, int length) throws IOException {
@@ -221,21 +297,18 @@
     
     public void writeUncompressedBytes(DataOutputStream outStream)
     throws IOException {
-      if (zlibInflater == null) {
-        zlibInflater = new Inflater();
+      if (decompressedStream == null) {
+        rawData = new DataInputBuffer();
+        decompressedStream = codec.createInputStream(rawData);
       } else {
-        zlibInflater.reset();
+        decompressedStream.resetState();
       }
-      zlibInflater.setInput(data, 0, dataSize);
+      rawData.reset(data, 0, dataSize);
 
       byte[] buffer = new byte[8192];
-      while (!zlibInflater.finished()) {
-        try {
-          int noDecompressedBytes = zlibInflater.inflate(buffer);
-          outStream.write(buffer, 0, noDecompressedBytes);
-        } catch (DataFormatException e) {
-          throw new IOException (e.toString());
-        }
+      int bytesRead = 0;
+      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
+        outStream.write(buffer, 0, bytesRead);
       }
     }
 
@@ -256,11 +329,9 @@
     Class valClass;
 
     private boolean compress;
-    Deflater deflater = new Deflater(Deflater.BEST_SPEED);
-    DeflaterOutputStream deflateFilter =
-      new DeflaterOutputStream(buffer, deflater);
-    DataOutputStream deflateOut =
-      new DataOutputStream(new BufferedOutputStream(deflateFilter));
+    CompressionCodec codec = null;
+    CompressionOutputStream deflateFilter = null;
+    DataOutputStream deflateOut = null;
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -309,7 +380,7 @@
     public Writer(FileSystem fs, Path name,
                   Class keyClass, Class valClass, boolean compress)
       throws IOException {
-      init(name, fs.create(name), keyClass, valClass, compress);
+      init(name, fs.create(name), keyClass, valClass, compress, null); 
 
       initializeFileHeader();
       writeFileHeader();
@@ -324,7 +395,8 @@
                   Class keyClass, Class valClass, boolean compress,
                   Progressable progress)
       throws IOException {
-      init(name, fs.create(name, progress), keyClass, valClass, compress);
+      init(name, fs.create(name, progress), keyClass, valClass, 
+          compress, null);
       
       initializeFileHeader();
       writeFileHeader();
@@ -347,8 +419,8 @@
 
     /** Write to an arbitrary stream using a specified buffer size. */
     private Writer(FSDataOutputStream out, Class keyClass, Class valClass)
-      throws IOException {
-      init(null, out, keyClass, valClass, false);
+    throws IOException {
+      init(null, out, keyClass, valClass, false, null);
       
       initializeFileHeader();
       writeFileHeader();
@@ -379,18 +451,28 @@
       
       out.writeBoolean(this.isCompressed());
       out.writeBoolean(this.isBlockCompressed());
+      
+      if(this.isCompressed()) {
+        Text.writeString(out, (codec.getClass()).getName());
+      }
     }
 
     /** Initialize. */
     void init(Path name, FSDataOutputStream out,
                       Class keyClass, Class valClass,
-                      boolean compress) 
+                      boolean compress, CompressionCodec codec) 
     throws IOException {
       this.target = name;
       this.out = out;
       this.keyClass = keyClass;
       this.valClass = valClass;
       this.compress = compress;
+      this.codec = codec;
+      if(this.codec != null) {
+        this.deflateFilter = this.codec.createOutputStream(buffer);
+        this.deflateOut = 
+          new DataOutputStream(new BufferedOutputStream(deflateFilter));
+      }
     }
     
     /** Returns the class of keys in this file. */
@@ -399,6 +481,9 @@
     /** Returns the class of values in this file. */
     public Class getValueClass() { return valClass; }
 
+    /** Returns the compression codec of data in this file. */
+    public CompressionCodec getCompressionCodec() { return codec; }
+
     /** Close the file. */
     public synchronized void close() throws IOException {
       if (out != null) {
@@ -435,7 +520,7 @@
 
       // Append the 'value'
       if (compress) {
-        deflater.reset();
+        deflateFilter.resetState();
         val.write(deflateOut);
         deflateOut.flush();
         deflateFilter.finish();
@@ -495,8 +580,9 @@
     
     /** Create the named file. */
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass) throws IOException {
-      super.init(name, fs.create(name), keyClass, valClass, true);
+        Class keyClass, Class valClass, CompressionCodec codec) 
+    throws IOException {
+      super.init(name, fs.create(name), keyClass, valClass, true, codec);
       
       initializeFileHeader();
       writeFileHeader();
@@ -505,9 +591,11 @@
     
     /** Create the named file with write-progress reporter. */
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, Progressable progress)
+        Class keyClass, Class valClass, CompressionCodec codec,
+        Progressable progress)
     throws IOException {
-      super.init(name, fs.create(name, progress), keyClass, valClass, true);
+      super.init(name, fs.create(name, progress), 
+          keyClass, valClass, true, codec);
       
       initializeFileHeader();
       writeFileHeader();
@@ -516,9 +604,9 @@
     
     /** Write to an arbitrary stream using a specified buffer size. */
     private RecordCompressWriter(FSDataOutputStream out,
-                   Class keyClass, Class valClass)
+                   Class keyClass, Class valClass, CompressionCodec codec)
       throws IOException {
-      super.init(null, out, keyClass, valClass, true);
+      super.init(null, out, keyClass, valClass, true, codec);
       
       initializeFileHeader();
       writeFileHeader();
@@ -546,7 +634,7 @@
         throw new IOException("zero length keys not allowed: " + key);
 
       // Compress 'value' and append it
-      deflater.reset();
+      deflateFilter.resetState();
       val.write(deflateOut);
       deflateOut.flush();
       deflateFilter.finish();
@@ -590,19 +678,13 @@
     private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
     private DataOutputBuffer valBuffer = new DataOutputBuffer();
 
-    private DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
-    private Deflater deflater = new Deflater(Deflater.BEST_SPEED);
-    private DeflaterOutputStream deflateFilter = 
-      new DeflaterOutputStream(compressedDataBuffer, deflater);
-    private DataOutputStream deflateOut = 
-      new DataOutputStream(new BufferedOutputStream(deflateFilter));
-
     private int compressionBlockSize;
     
     /** Create the named file. */
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass) throws IOException {
-      super.init(name, fs.create(name), keyClass, valClass, true);
+        Class keyClass, Class valClass, CompressionCodec codec) 
+    throws IOException {
+      super.init(name, fs.create(name), keyClass, valClass, true, codec);
       init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
@@ -612,9 +694,11 @@
     
     /** Create the named file with write-progress reporter. */
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
-        Class keyClass, Class valClass, Progressable progress)
+        Class keyClass, Class valClass, CompressionCodec codec,
+        Progressable progress)
     throws IOException {
-      super.init(name, fs.create(name, progress), keyClass, valClass, true);
+      super.init(name, fs.create(name, progress), keyClass, valClass, 
+          true, codec);
       init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
@@ -624,9 +708,9 @@
     
     /** Write to an arbitrary stream using a specified buffer size. */
     private BlockCompressWriter(FSDataOutputStream out,
-                   Class keyClass, Class valClass)
+                   Class keyClass, Class valClass, CompressionCodec codec)
       throws IOException {
-      super.init(null, out, keyClass, valClass, true);
+      super.init(null, out, keyClass, valClass, true, codec);
       init(1000000);
       
       initializeFileHeader();
@@ -644,17 +728,17 @@
     
     /** Workhorse to check and write out compressed data/lengths */
     private synchronized 
-    void writeBuffer(DataOutputBuffer buffer) 
+    void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
     throws IOException {
-      deflater.reset();
-      compressedDataBuffer.reset();
-      deflateOut.write(buffer.getData(), 0, buffer.getLength());
+      deflateFilter.resetState();
+      buffer.reset();
+      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
+          uncompressedDataBuffer.getLength());
       deflateOut.flush();
       deflateFilter.finish();
       
-      WritableUtils.writeVInt(out, compressedDataBuffer.getLength());
-      out.write(compressedDataBuffer.getData(), 
-          0, compressedDataBuffer.getLength());
+      WritableUtils.writeVInt(out, buffer.getLength());
+      out.write(buffer.getData(), 0, buffer.getLength());
     }
     
     /** Compress and flush contents to dfs */
@@ -771,6 +855,8 @@
     private Class keyClass;
     private Class valClass;
 
+    private CompressionCodec codec = null;
+    
     private byte[] sync = new byte[SYNC_HASH_SIZE];
     private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
     private boolean syncSeen;
@@ -791,17 +877,17 @@
     private int noBufferedValues = 0;
     
     private DataInputBuffer keyLenBuffer = null;
-    private Inflater keyLenInflater = null;
+    private CompressionInputStream keyLenInFilter = null;
     private DataInputStream keyLenIn = null;
     private DataInputBuffer keyBuffer = null;
-    private Inflater keyInflater = null;
+    private CompressionInputStream keyInFilter = null;
     private DataInputStream keyIn = null;
 
     private DataInputBuffer valLenBuffer = null;
-    private Inflater valLenInflater = null;
+    private CompressionInputStream valLenInFilter = null;
     private DataInputStream valLenIn = null;
     private DataInputBuffer valBuffer = null;
-    private Inflater valInflater = null;
+    private CompressionInputStream valInFilter = null;
     private DataInputStream valIn = null;
 
     /** @deprecated Call {@link #SequenceFile.Reader(FileSystem,Path,Configuration)}.*/
@@ -870,6 +956,19 @@
         this.blockCompressed = in.readBoolean();  // is block-compressed?
       }
       
+      // if version >= 5
+      // setup the compression codec
+      if (version >= CUSTOM_COMPRESS_VERSION && this.decompress) {    
+        try {
+          this.codec = (CompressionCodec)
+          ReflectionUtils.newInstance(conf.getClassByName(Text.readString(in)),
+              conf);
+        } catch (ClassNotFoundException cnfe) {
+          cnfe.printStackTrace();
+          throw new IllegalArgumentException("Unknown codec: " + cnfe);
+        }
+      }
+      
       if (version > 1) {                          // if version > 1
         in.readFully(sync);                       // read sync bytes
       }
@@ -877,10 +976,8 @@
       // Initialize
       valBuffer = new DataInputBuffer();
       if (decompress) {
-        valInflater = new Inflater();
-        valIn = new DataInputStream(new BufferedInputStream(
-            new InflaterInputStream(valBuffer, valInflater))
-        );
+        valInFilter = this.codec.createInputStream(valBuffer);
+        valIn = new DataInputStream(new BufferedInputStream(valInFilter));
       } else {
         valIn = new DataInputStream(new BufferedInputStream(valBuffer));
       }
@@ -890,19 +987,14 @@
         keyBuffer = new DataInputBuffer();
         valLenBuffer = new DataInputBuffer();
         
-        keyLenInflater = new Inflater();
-        keyLenIn = new DataInputStream(new BufferedInputStream(
-            new InflaterInputStream(keyLenBuffer, keyLenInflater))
-        );
-        
-        keyInflater = new Inflater();
-        keyIn = new DataInputStream(new BufferedInputStream(
-            new InflaterInputStream(keyBuffer, keyInflater)));
-        
-        valLenInflater = new Inflater();
-        valLenIn = new DataInputStream(new BufferedInputStream(
-            new InflaterInputStream(valLenBuffer, valLenInflater))
-        );
+        keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
+        keyLenIn = new DataInputStream(new BufferedInputStream(keyLenInFilter));
+
+        keyInFilter = this.codec.createInputStream(keyBuffer);
+        keyIn = new DataInputStream(new BufferedInputStream(keyInFilter));
+
+        valLenInFilter = this.codec.createInputStream(valLenBuffer);
+        valLenIn = new DataInputStream(new BufferedInputStream(valLenInFilter));
       }
       
 
@@ -926,18 +1018,20 @@
     /** Returns true if records are block-compressed. */
     public boolean isBlockCompressed() { return blockCompressed; }
     
+    /** Returns the compression codec of data in this file. */
+    public CompressionCodec getCompressionCodec() { return codec; }
+
     /** Read a compressed buffer */
-    private synchronized void readBuffer(
-        DataInputBuffer buffer, Inflater inflater, boolean castAway
-        ) throws IOException {
+    private synchronized void readBuffer(DataInputBuffer buffer, 
+        CompressionInputStream filter, boolean castAway) throws IOException {
       // Read data into a temporary buffer
       DataOutputBuffer dataBuffer = new DataOutputBuffer();
       int dataBufferLength = WritableUtils.readVInt(in);
       dataBuffer.write(in, dataBufferLength);
       
       if (false == castAway) {
-        // Reset the inflater
-        inflater.reset();
+        // Reset the codec
+        filter.resetState();
         
         // Set up 'buffer' connected to the input-stream
         buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
@@ -970,14 +1064,14 @@
       noBufferedRecords = WritableUtils.readVInt(in);
       
       // Read key lengths and keys
-      readBuffer(keyLenBuffer, keyLenInflater, false);
-      readBuffer(keyBuffer, keyInflater, false);
+      readBuffer(keyLenBuffer, keyLenInFilter, false);
+      readBuffer(keyBuffer, keyInFilter, false);
       noBufferedKeys = noBufferedRecords;
       
       // Read value lengths and values
       if (!lazyDecompress) {
-        readBuffer(valLenBuffer, valLenInflater, false);
-        readBuffer(valBuffer, valInflater, false);
+        readBuffer(valLenBuffer, valLenInFilter, false);
+        readBuffer(valBuffer, valInFilter, false);
         noBufferedValues = noBufferedRecords;
         valuesDecompressed = true;
       }
@@ -990,14 +1084,14 @@
     private synchronized void seekToCurrentValue() throws IOException {
       if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
         if (decompress) {
-          valInflater.reset();
+          valInFilter.resetState();
         }
       } else {
         // Check if this is the first value in the 'block' to be read
         if (lazyDecompress && !valuesDecompressed) {
           // Read the value lengths and values
-          readBuffer(valLenBuffer, valLenInflater, false);
-          readBuffer(valBuffer, valInflater, false);
+          readBuffer(valLenBuffer, valLenInFilter, false);
+          readBuffer(valBuffer, valInFilter, false);
           noBufferedValues = noBufferedRecords;
           valuesDecompressed = true;
         }
@@ -1168,7 +1262,7 @@
       if (!decompress || blockCompressed) {
         val = new UncompressedBytes();
       } else {
-        val = new CompressedBytes();
+        val = new CompressedBytes(codec);
       }
       return val;
     }
@@ -1418,6 +1512,7 @@
         boolean atEof = (currentFile >= inFiles.length);
         boolean isCompressed = false;
         boolean isBlockCompressed = false;
+        CompressionCodec codec = null;
         segmentLengths.clear();
         if (atEof) {
           return 0;
@@ -1427,6 +1522,8 @@
         in = new Reader(fs, inFiles[currentFile], conf);
         isCompressed = in.isCompressed();
         isBlockCompressed = in.isBlockCompressed();
+        codec = in.getCompressionCodec();
+        
         for (int i=0; i < rawValues.length; ++i) {
           rawValues[i] = null;
         }
@@ -1479,7 +1576,7 @@
           LOG.debug("flushing segment " + segments);
           rawBuffer = rawKeys.getData();
           sort(count);
-          flush(count, bytesProcessed, isCompressed, isBlockCompressed, 
+          flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, 
               segments==0 && atEof);
           segments++;
         }
@@ -1523,7 +1620,8 @@
       }
 
       private void flush(int count, int bytesProcessed, boolean isCompressed, 
-          boolean isBlockCompressed, boolean done) throws IOException {
+          boolean isBlockCompressed, CompressionCodec codec, boolean done) 
+      throws IOException {
         if (out == null) {
           outName = done ? outFile : outFile.suffix(".0");
           out = fs.create(outName);
@@ -1534,7 +1632,7 @@
 
         long segmentStart = out.getPos();
         Writer writer = createWriter(out, keyClass, valClass, 
-            isCompressed, isBlockCompressed);
+            isCompressed, isBlockCompressed, codec);
         
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
@@ -1757,11 +1855,13 @@
       private boolean done;
       private boolean compress;
       private boolean blockCompress;
+      private CompressionCodec codec = null;
 
       public void put(MergeStream stream) throws IOException {
         if (size() == 0) {
           compress = stream.in.isCompressed();
           blockCompress = stream.in.isBlockCompressed();
+          codec = stream.in.getCompressionCodec();
         } else if (compress != stream.in.isCompressed() || 
             blockCompress != stream.in.isBlockCompressed()) {
           throw new IOException("All merged files must be compressed or not.");
@@ -1790,7 +1890,7 @@
 
       public void merge() throws IOException {
         Writer writer = createWriter(out, keyClass, valClass, 
-            compress, blockCompress);
+            compress, blockCompress, codec);
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
         }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java Thu Sep  7 13:30:21 2006
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This class encapsulates a streaming compression/decompression pair.
+ * @author Owen O'Malley
+ */
+public interface CompressionCodec {
+
+  /**
+   * Create a stream compressor that will write to the given output stream.
+   * @param out the location for the final output stream
+   * @return a stream the user can write uncompressed data to
+   */
+  CompressionOutputStream createOutputStream(OutputStream out) 
+  throws IOException;
+  
+  /**
+   * Create a stream decompressor that will read from the given input stream.
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   */
+  CompressionInputStream createInputStream(InputStream in) throws IOException;
+  
+  /**
+   * Get the default filename extension for this kind of compression.
+   * @return the extension including the '.'
+   */
+  String getDefaultExtension();
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java Thu Sep  7 13:30:21 2006
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+* A compression input stream.
+  * @author Arun C Murthy
+ */
+public abstract class CompressionInputStream extends InputStream {
+  /**
+   * The input stream to be compressed. 
+   */
+  protected final InputStream in;
+
+  /**
+   * Create a compression input stream that reads
+   * the decompressed bytes from the given stream.
+   * @param out
+   */
+  protected CompressionInputStream(InputStream in) {
+    this.in = in;
+  }
+
+  public void close() throws IOException {
+    in.close();
+  }
+  
+  /**
+   * Read bytes from the stream.
+   * Made abstract to prevent leakage to underlying stream.
+   */
+  public abstract int read(byte[] b, int off, int len) throws IOException;
+
+  /**
+   * Reset the compression to the initial state. Does not reset the underlying
+   * stream.
+   */
+  public abstract void resetState() throws IOException;
+  
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionOutputStream.java Thu Sep  7 13:30:21 2006
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A compression output stream.
+ * @author Arun C Murthy
+ */
+public abstract class CompressionOutputStream extends OutputStream {
+  /**
+   * The output stream to be compressed. 
+   */
+  protected final OutputStream out;
+  
+  /**
+   * Create a compression output stream that writes
+   * the compressed bytes to the given stream.
+   * @param out
+   */
+  protected CompressionOutputStream(OutputStream out) {
+    this.out = out;
+  }
+  
+  public void close() throws IOException {
+    finish();
+    out.close();
+  }
+  
+  public void flush() throws IOException {
+    out.flush();
+  }
+  
+  /**
+   * Write compressed bytes to the stream.
+   * Made abstract to prevent leakage to underlying stream.
+   */
+  public abstract void write(byte[] b, int off, int len) throws IOException;
+
+  /**
+   * Finishes writing compressed data to the output stream 
+   * without closing the underlying stream.
+   */
+  public abstract void finish() throws IOException;
+  
+  /**
+   * Reset the compression to the initial state. 
+   * Does not reset the underlying stream.
+   */
+  public abstract void resetState() throws IOException;
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Thu Sep  7 13:30:21 2006
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.*;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+public class DefaultCodec implements CompressionCodec {
+
+  /**
+   * A bridge that wraps around a DeflaterOutputStream to make it 
+   * a CompressionOutputStream.
+   * @author Owen O'Malley
+   */
+  protected static class DefaultCompressionOutputStream 
+  extends CompressionOutputStream {
+
+    /**
+     * A DeflaterOutputStream that provides a mechanism to 
+     * reset the decompressor.
+     * @author Owen O'Malley
+     */
+    private static class ResetableDeflaterOutputStream 
+    extends DeflaterOutputStream {
+      
+      public ResetableDeflaterOutputStream(OutputStream out) {
+        super(out);
+      }
+      
+      public void resetState() throws IOException {
+        def.reset();
+      }
+    }
+    
+    public DefaultCompressionOutputStream(OutputStream out) {
+      super(new ResetableDeflaterOutputStream(out));
+    }
+    
+    /**
+     * Allow children types to put a different type in here (namely gzip).
+     * @param out the Deflater stream to use
+     */
+    protected DefaultCompressionOutputStream(DeflaterOutputStream out) {
+      super(out);
+    }
+    
+    public void close() throws IOException {
+      out.close();
+    }
+    
+    public void flush() throws IOException {
+      out.flush();
+    }
+    
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+    
+    public void write(byte[] data, int offset, int length) 
+    throws IOException {
+      out.write(data, offset, length);
+    }
+    
+    public void finish() throws IOException {
+      ((DeflaterOutputStream) out).finish();
+    }
+    
+    public void resetState() throws IOException {
+      ((ResetableDeflaterOutputStream) out).resetState();
+    }
+  }
+  
+  protected static class DefaultCompressionInputStream 
+  extends CompressionInputStream {
+    
+    /**
+     * A InflaterStream that provides a mechanism to reset the decompressor.
+     * @author Owen O'Malley
+     */
+    private static class ResetableInflaterInputStream 
+    extends InflaterInputStream {
+      public ResetableInflaterInputStream(InputStream in) {
+        super(in);
+      }
+      
+      public void resetState() throws IOException {
+        inf.reset();
+      }
+    }
+    
+    public DefaultCompressionInputStream(InputStream in) {
+      super(new ResetableInflaterInputStream(in));
+    }
+    
+    /**
+     * Allow subclasses to directly set the inflater stream
+     */
+    protected DefaultCompressionInputStream(InflaterInputStream in) {
+      super(in);
+    }
+    
+    public int available() throws IOException {
+      return in.available(); 
+    }
+
+    public void close() throws IOException {
+      in.close();
+    }
+
+    public int read() throws IOException {
+      return in.read();
+    }
+    
+    public int read(byte[] data, int offset, int len) throws IOException {
+      return in.read(data, offset, len);
+    }
+    
+    public long skip(long offset) throws IOException {
+      return in.skip(offset);
+    }
+    
+    public void resetState() throws IOException {
+      ((ResetableInflaterInputStream) in).resetState();
+    }
+    
+  }
+  
+  /**
+   * Create a stream compressor that will write to the given output stream.
+   * @param out the location for the final output stream
+   * @return a stream the user can write uncompressed data to
+   */
+  public CompressionOutputStream createOutputStream(OutputStream out) 
+  throws IOException {
+    return new DefaultCompressionOutputStream(out);
+  }
+  
+  /**
+   * Create a stream decompressor that will read from the given input stream.
+   * @param in the stream to read compressed bytes from
+   * @return a stream to read uncompressed bytes from
+   */
+  public CompressionInputStream createInputStream(InputStream in) 
+  throws IOException {
+    return new DefaultCompressionInputStream(in);
+  }
+  
+  /**
+   * Get the default filename extension for this kind of compression.
+   * @return the extension including the '.'
+   */
+  public String getDefaultExtension() {
+    return ".deflate";
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=441210&r1=441209&r2=441210
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Sep  7 13:30:21 2006
@@ -19,6 +19,7 @@
 import java.io.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -123,15 +124,20 @@
     final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
     try {
       FileSystem localFs = FileSystem.getNamed("local", job);
+      /** TODO: Figure out a way to deprecate 'mapred.compress.map.output' */
       boolean compressTemps = job.getBoolean("mapred.compress.map.output", 
                                              false);
       for (int i = 0; i < partitions; i++) {
         outs[i] =
-          new SequenceFile.Writer(localFs,
+          SequenceFile.createWriter(localFs, job,
                                   this.mapOutputFile.getOutputFile(getTaskId(), i),
                                   job.getMapOutputKeyClass(),
                                   job.getMapOutputValueClass(),
-                                  compressTemps);
+                                  compressTemps ? CompressionType.RECORD : 
+                                    CompressionType.valueOf(
+                                        job.get("mapred.seqfile.compression.type", 
+                                            "NONE"))
+                                  );
         LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName());
       }
 

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java?view=auto&rev=441210
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java Thu Sep  7 13:30:21 2006
@@ -0,0 +1,138 @@
+package org.apache.hadoop.io.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+
+public class TestCodec extends TestCase {
+
+  private static final Log LOG= 
+    LogFactory.getLog("org.apache.hadoop.io.compress.TestCodec");
+  
+  public void testCodec() throws IOException {
+    int count = 10000;
+    int seed = new Random().nextInt();
+    
+    codecTest(seed, count, "org.apache.hadoop.io.compress.DefaultCodec");
+  }
+  
+  private static void codecTest(int seed, int count, String codecClass) 
+  throws IOException {
+    
+    // Create the codec
+    Configuration conf = new Configuration();
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec)
+        ReflectionUtils.newInstance(conf.getClassByName(codecClass), conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    LOG.debug("Created a Codec object of type: " + codecClass);
+
+    // Generate data
+    DataOutputBuffer data = new DataOutputBuffer();
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    for(int i=0; i < count; ++i) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+      
+      key.write(data);
+      value.write(data);
+    }
+    DataInputBuffer originalData = new DataInputBuffer();
+    DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
+    originalData.reset(data.getData(), 0, data.getLength());
+    
+    LOG.debug("Generated " + count + " records");
+    
+    // Compress data
+    DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+    CompressionOutputStream deflateFilter = 
+      codec.createOutputStream(compressedDataBuffer);
+    DataOutputStream deflateOut = 
+      new DataOutputStream(new BufferedOutputStream(deflateFilter));
+    
+    deflateFilter.resetState();
+    compressedDataBuffer.reset();
+    deflateOut.write(data.getData(), 0, data.getLength());
+    deflateOut.flush();
+    deflateFilter.finish();
+    LOG.debug("Finished compressing data");
+    
+    // De-compress data
+    DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+    CompressionInputStream inflateFilter = 
+      codec.createInputStream(deCompressedDataBuffer);
+    DataInputStream inflateIn = 
+      new DataInputStream(new BufferedInputStream(inflateFilter));
+    
+    inflateFilter.resetState();
+    deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
+        compressedDataBuffer.getLength());
+
+    // Check
+    for(int i=0; i < count; ++i) {
+      RandomDatum k1 = new RandomDatum();
+      RandomDatum v1 = new RandomDatum();
+      k1.readFields(originalIn);
+      v1.readFields(originalIn);
+      
+      RandomDatum k2 = new RandomDatum();
+      RandomDatum v2 = new RandomDatum();
+      k2.readFields(inflateIn);
+      v2.readFields(inflateIn);
+    }
+    LOG.debug("SUCCESS! Completed checking " + count + " records");
+  }
+  
+  public static void main(String[] args) {
+    int count = 10000;
+    String codecClass = "org.apache.hadoop.io.compress.DefaultCodec";
+
+    String usage = "TestCodec [-count N] [-codec <codec class>]";
+    if (args.length == 0) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+
+    try {
+    for (int i=0; i < args.length; ++i) {       // parse command line
+      if (args[i] == null) {
+        continue;
+      } else if (args[i].equals("-count")) {
+        count = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-codec")) {
+        codecClass = args[++i];
+      }
+    }
+
+    int seed = 0;
+    codecTest(seed, count, codecClass);
+    } catch (Exception e) {
+      System.err.println("Caught: " + e);
+      e.printStackTrace();
+    }
+    
+  }
+
+  public TestCodec(String name) {
+    super(name);
+  }
+
+}