You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2010/12/04 08:13:12 UTC

svn commit: r1042107 [2/6] - in /hadoop/common/branches/HADOOP-6685: ./ ivy/ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/file/tfile/ src/java/org/apache/hadoop/io/serial/ src/java/org/apache/had...

Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SequenceFile.java Sat Dec  4 07:13:10 2010
@@ -34,12 +34,14 @@ import org.apache.hadoop.io.compress.Dec
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.Serialization;
+import org.apache.hadoop.io.serial.SerializationFactory;
+import org.apache.hadoop.io.serial.TypedSerialization;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -93,10 +95,16 @@ import org.apache.hadoop.util.PriorityQu
  *             version number (e.g. SEQ4 or SEQ6)
  *   </li>
  *   <li>
- *   keyClassName -key class
+ *   key serialization name 
  *   </li>
  *   <li>
- *   valueClassName - value class
+ *   key serialization configuration
+ *   </li>
+ *   <li>
+ *   value serialization name
+ *   </li>
+ *   <li>
+ *   value serialization data
  *   </li>
  *   <li>
  *   compression - A boolean which specifies if compression is turned on for 
@@ -134,7 +142,7 @@ import org.apache.hadoop.util.PriorityQu
  *   </ul>
  * </li>
  * <li>
- * A sync-marker every few <code>100</code> bytes or so.
+ * A sync-marker every <code>2000</code> bytes or so.
  * </li>
  * </ul>
  *
@@ -153,7 +161,7 @@ import org.apache.hadoop.util.PriorityQu
  *   </ul>
  * </li>
  * <li>
- * A sync-marker every few <code>100</code> bytes or so.
+ * A sync-marker every <code>2000</code> bytes or so.
  * </li>
  * </ul>
  * 
@@ -165,6 +173,7 @@ import org.apache.hadoop.util.PriorityQu
  * <li>
  * Record <i>Block</i>
  *   <ul>
+ *     <li>sync-marker</li>
  *     <li>Compressed key-lengths block-size</li>
  *     <li>Compressed key-lengths block</li>
  *     <li>Compressed keys block-size</li>
@@ -175,9 +184,6 @@ import org.apache.hadoop.util.PriorityQu
  *     <li>Compressed values block</li>
  *   </ul>
  * </li>
- * <li>
- * A sync-marker every few <code>100</code> bytes or so.
- * </li>
  * </ul>
  * 
  * <p>The compressed blocks of key lengths and value lengths consist of the 
@@ -196,8 +202,9 @@ public class SequenceFile {
   private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
   private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
   private static final byte VERSION_WITH_METADATA = (byte)6;
+  private static final byte SERIALIZATION_VERSION = (byte) 7;
   private static byte[] VERSION = new byte[] {
-    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
+    (byte)'S', (byte)'E', (byte)'Q', SERIALIZATION_VERSION
   };
 
   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
@@ -285,6 +292,7 @@ public class SequenceFile {
    *     instead.
    */
   @Deprecated
+  @SuppressWarnings("unchecked")
   public static Writer 
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass) throws IOException {
@@ -306,6 +314,7 @@ public class SequenceFile {
    *     instead.
    */
   @Deprecated
+  @SuppressWarnings("unchecked")
   public static Writer 
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass, 
@@ -330,6 +339,7 @@ public class SequenceFile {
    *     instead.
    */
   @Deprecated
+  @SuppressWarnings("unchecked")
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass, CompressionType compressionType,
@@ -355,6 +365,7 @@ public class SequenceFile {
    *     instead.
    */
   @Deprecated
+  @SuppressWarnings("unchecked")
   public static Writer 
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass, CompressionType compressionType, 
@@ -381,6 +392,7 @@ public class SequenceFile {
    *     instead.
    */
   @Deprecated
+  @SuppressWarnings("unchecked")
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass, 
@@ -413,6 +425,7 @@ public class SequenceFile {
    *     instead.
    */
   @Deprecated
+  @SuppressWarnings("unchecked")
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name,
                  Class keyClass, Class valClass, int bufferSize,
@@ -444,6 +457,7 @@ public class SequenceFile {
    * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
    *     instead.
    */
+  @SuppressWarnings("unchecked")
   @Deprecated
   public static Writer
     createWriter(FileSystem fs, Configuration conf, Path name, 
@@ -471,6 +485,7 @@ public class SequenceFile {
    *     instead.
    */
   @Deprecated
+  @SuppressWarnings("unchecked")
   public static Writer
     createWriter(Configuration conf, FSDataOutputStream out, 
                  Class keyClass, Class valClass,
@@ -495,6 +510,7 @@ public class SequenceFile {
    * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
    *     instead.
    */
+  @SuppressWarnings("unchecked")
   @Deprecated
   public static Writer
     createWriter(Configuration conf, FSDataOutputStream out, 
@@ -527,18 +543,44 @@ public class SequenceFile {
      * Size of stored data.
      */
     public int getSize();
+    
+  }
+
+  /**
+   * Make an InputStream from a ValueBytes.
+   * @param bytes the bytes to provide as input
+   * @return a new input stream with the bytes
+   * @throws IOException
+   */
+  private static InputStream readUncompressedBytes(ValueBytes bytes
+                                                  ) throws IOException {
+    DataInputBuffer result = new DataInputBuffer();
+    if (bytes instanceof UncompressedBytes) {
+      MutableValueBytes concrete = (MutableValueBytes) bytes;
+      result.reset(concrete.data, concrete.dataSize);
+    } else {
+      DataOutputBuffer outBuf = new DataOutputBuffer();
+      bytes.writeUncompressedBytes(outBuf);
+      result.reset(outBuf.getData(), outBuf.getLength());
+    }
+    return result;
   }
+
   
-  private static class UncompressedBytes implements ValueBytes {
-    private int dataSize;
-    private byte[] data;
-    
-    private UncompressedBytes() {
+  private static abstract class MutableValueBytes implements ValueBytes {
+    protected byte[] data;
+    protected int dataSize;
+
+    MutableValueBytes() {
       data = null;
       dataSize = 0;
     }
+
+    public int getSize() {
+      return dataSize;
+    }
     
-    private void reset(DataInputStream in, int length) throws IOException {
+    void reset(DataInputStream in, int length) throws IOException {
       if (data == null) {
         data = new byte[length];
       } else if (length > data.length) {
@@ -548,10 +590,14 @@ public class SequenceFile {
       in.readFully(data, 0, length);
       dataSize = length;
     }
-    
-    public int getSize() {
-      return dataSize;
+
+    void set(MutableValueBytes other) {
+      data = other.data;
+      dataSize = other.dataSize;
     }
+  }
+
+  private static class UncompressedBytes extends MutableValueBytes {
     
     public void writeUncompressedBytes(DataOutputStream outStream)
       throws IOException {
@@ -566,34 +612,15 @@ public class SequenceFile {
 
   } // UncompressedBytes
   
-  private static class CompressedBytes implements ValueBytes {
-    private int dataSize;
-    private byte[] data;
+  private static class CompressedBytes extends MutableValueBytes {
     DataInputBuffer rawData = null;
     CompressionCodec codec = null;
     CompressionInputStream decompressedStream = null;
 
     private CompressedBytes(CompressionCodec codec) {
-      data = null;
-      dataSize = 0;
       this.codec = codec;
     }
 
-    private void reset(DataInputStream in, int length) throws IOException {
-      if (data == null) {
-        data = new byte[length];
-      } else if (length > data.length) {
-        data = new byte[Math.max(length, data.length * 2)];
-      } 
-      dataSize = -1;
-      in.readFully(data, 0, length);
-      dataSize = length;
-    }
-    
-    public int getSize() {
-      return dataSize;
-    }
-    
     public void writeUncompressedBytes(DataOutputStream outStream)
       throws IOException {
       if (decompressedStream == null) {
@@ -738,9 +765,6 @@ public class SequenceFile {
     boolean ownOutputStream = true;
     DataOutputBuffer buffer = new DataOutputBuffer();
 
-    Class keyClass;
-    Class valClass;
-
     private final CompressionType compress;
     CompressionCodec codec = null;
     CompressionOutputStream deflateFilter = null;
@@ -748,9 +772,8 @@ public class SequenceFile {
     Metadata metadata = null;
     Compressor compressor = null;
     
-    protected Serializer keySerializer;
-    protected Serializer uncompressedValSerializer;
-    protected Serializer compressedValSerializer;
+    protected Serialization<Object> keySerialization;
+    protected Serialization<Object> valueSerialization;
     
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -817,6 +840,20 @@ public class SequenceFile {
       }
     }
 
+    static class KeySerialization extends Options.SerializationOption 
+                                  implements Option {
+      KeySerialization(Serialization<?> value) {
+        super(value);
+      }
+    }
+
+    static class ValueSerialization extends Options.SerializationOption 
+                                    implements Option {
+      ValueSerialization(Serialization<?> value) {
+        super(value);
+      }
+    }
+
     static class MetadataOption implements Option {
       private final Metadata value;
       MetadataOption(Metadata value) {
@@ -878,6 +915,14 @@ public class SequenceFile {
       return new ProgressableOption(value);
     }
 
+    public static Option keySerialization(Serialization<?> value) {
+      return new KeySerialization(value);
+    }
+ 
+    public static Option valueSerialization(Serialization<?> value) {
+      return new ValueSerialization(value);
+    }
+ 
     public static Option keyClass(Class<?> value) {
       return new KeyClassOption(value);
     }
@@ -905,6 +950,7 @@ public class SequenceFile {
      * @param options the options used when creating the writer
      * @throws IOException if it fails
      */
+    @SuppressWarnings("unchecked")
     Writer(Configuration conf, 
            Option... opts) throws IOException {
       BlockSizeOption blockSizeOption = 
@@ -917,6 +963,10 @@ public class SequenceFile {
         Options.getOption(ProgressableOption.class, opts);
       FileOption fileOption = Options.getOption(FileOption.class, opts);
       StreamOption streamOption = Options.getOption(StreamOption.class, opts);
+      KeySerialization keySerializationOption = 
+        Options.getOption(KeySerialization.class, opts);
+      ValueSerialization valueSerializationOption = 
+        Options.getOption(ValueSerialization.class, opts);
       KeyClassOption keyClassOption = 
         Options.getOption(KeyClassOption.class, opts);
       ValueClassOption valueClassOption = 
@@ -936,6 +986,15 @@ public class SequenceFile {
         throw new IllegalArgumentException("file modifier options not " +
                                            "compatible with stream");
       }
+      // exactly one of serialization or class must be set.
+      if ((keySerializationOption == null) == (keyClassOption == null)) {
+        throw new IllegalArgumentException("Either keySerialization or " +
+                                           " keyClass must be set.");
+      }
+      if ((valueSerializationOption == null) == (valueClassOption == null)) {
+        throw new IllegalArgumentException("Either valueSerialization or " +
+                                           " valueClass must be set.");
+      }
 
       FSDataOutputStream out;
       boolean ownStream = fileOption != null;
@@ -955,10 +1014,31 @@ public class SequenceFile {
       } else {
         out = streamOption.getValue();
       }
-      Class<?> keyClass = keyClassOption == null ?
-          Object.class : keyClassOption.getValue();
-      Class<?> valueClass = valueClassOption == null ?
-          Object.class : valueClassOption.getValue();
+      
+      // find the key serialization by parameter or by key type
+      Serialization<Object> keySerialization;
+      if (keyClassOption != null) {
+        Class<?> keyClass = keyClassOption.getValue();
+        SerializationFactory factory = SerializationFactory.getInstance(conf);
+        keySerialization = 
+          (Serialization<Object>) factory.getSerializationByType(keyClass);
+      } else {
+        keySerialization = 
+          (Serialization<Object>) keySerializationOption.getValue();
+      }
+
+      // find the value serialization by parameter or by value type
+      Serialization<Object> valueSerialization;
+      if (valueClassOption != null) {
+        Class<?> valueClass = valueClassOption.getValue();
+        SerializationFactory factory = SerializationFactory.getInstance(conf);
+        valueSerialization = 
+          (Serialization<Object>) factory.getSerializationByType(valueClass);
+      } else {
+        valueSerialization = 
+          (Serialization<Object>) valueSerializationOption.getValue();
+      }
+
       Metadata metadata = metadataOption == null ?
           new Metadata() : metadataOption.getValue();
       this.compress = compressionTypeOption.getValue();
@@ -971,7 +1051,8 @@ public class SequenceFile {
                                            "GzipCodec without native-hadoop " +
                                            "code!");
       }
-      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
+      init(conf, out, ownStream, keySerialization, valueSerialization, 
+           codec, metadata);
     }
 
     /** Create the named file.
@@ -979,11 +1060,15 @@ public class SequenceFile {
      *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
      *   instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(FileSystem fs, Configuration conf, Path name, 
                   Class keyClass, Class valClass) throws IOException {
       this.compress = CompressionType.NONE;
-      init(conf, fs.create(name), true, keyClass, valClass, null, 
+      SerializationFactory factory = SerializationFactory.getInstance(conf);
+      init(conf, fs.create(name), true, 
+           factory.getSerializationByType(keyClass), 
+           factory.getSerializationByType(valClass), null, 
            new Metadata());
     }
     
@@ -992,12 +1077,16 @@ public class SequenceFile {
      *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
      *   instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(FileSystem fs, Configuration conf, Path name, 
                   Class keyClass, Class valClass,
                   Progressable progress, Metadata metadata) throws IOException {
       this.compress = CompressionType.NONE;
-      init(conf, fs.create(name, progress), true, keyClass, valClass,
+      SerializationFactory factory = SerializationFactory.getInstance(conf);
+      init(conf, fs.create(name, progress), true, 
+           factory.getSerializationByType(keyClass), 
+           factory.getSerializationByType(valClass),
            null, metadata);
     }
     
@@ -1006,15 +1095,18 @@ public class SequenceFile {
      *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
      *   instead.
      */
+    @SuppressWarnings("unchecked")
     @Deprecated
     public Writer(FileSystem fs, Configuration conf, Path name,
                   Class keyClass, Class valClass,
                   int bufferSize, short replication, long blockSize,
                   Progressable progress, Metadata metadata) throws IOException {
       this.compress = CompressionType.NONE;
+      SerializationFactory factory = SerializationFactory.getInstance(conf);
       init(conf,
            fs.create(name, true, bufferSize, replication, blockSize, progress),
-           true, keyClass, valClass, null, metadata);
+           true, factory.getSerializationByType(keyClass), 
+           factory.getSerializationByType(valClass), null, metadata);
     }
 
     boolean isCompressed() { return compress != CompressionType.NONE; }
@@ -1024,8 +1116,20 @@ public class SequenceFile {
     private void writeFileHeader() 
       throws IOException {
       out.write(VERSION);
-      Text.writeString(out, keyClass.getName());
-      Text.writeString(out, valClass.getName());
+      
+      // write out key serialization
+      Text.writeString(out, keySerialization.getName());
+      buffer.reset();
+      keySerialization.serializeSelf(buffer);
+      WritableUtils.writeVInt(out, buffer.getLength());
+      out.write(buffer.getData(), 0, buffer.getLength());
+      
+      // write out value serialization
+      Text.writeString(out, valueSerialization.getName());
+      buffer.reset();
+      valueSerialization.serializeSelf(buffer);
+      WritableUtils.writeVInt(out, buffer.getLength());
+      out.write(buffer.getData(), 0, buffer.getLength());
       
       out.writeBoolean(this.isCompressed());
       out.writeBoolean(this.isBlockCompressed());
@@ -1039,40 +1143,74 @@ public class SequenceFile {
     }
     
     /** Initialize. */
-    @SuppressWarnings("unchecked")
     void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
-              Class keyClass, Class valClass,
+              Serialization<Object> keySerialization, 
+              Serialization<Object> valueSerialization,
               CompressionCodec codec, Metadata metadata) 
       throws IOException {
       this.conf = conf;
       this.out = out;
       this.ownOutputStream = ownStream;
-      this.keyClass = keyClass;
-      this.valClass = valClass;
+      this.keySerialization = keySerialization;
+      this.valueSerialization = valueSerialization;
       this.codec = codec;
       this.metadata = metadata;
-      SerializationFactory serializationFactory = new SerializationFactory(conf);
-      this.keySerializer = serializationFactory.getSerializer(keyClass);
-      this.keySerializer.open(buffer);
-      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
-      this.uncompressedValSerializer.open(buffer);
       if (this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
         this.compressor = CodecPool.getCompressor(this.codec);
         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
-        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
-        this.compressedValSerializer.open(deflateOut);
       }
       writeFileHeader();
     }
     
-    /** Returns the class of keys in this file. */
-    public Class getKeyClass() { return keyClass; }
+    /** Returns the class of keys in this file. Only works for
+     * if a TypedSerialization is used, otherwise Object is returned.
+     * @deprecated Use {@link #getKeySerialization} instead.
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
+    public Class getKeyClass() { 
+      Class result = null;
+      if (keySerialization instanceof TypedSerialization<?>) {
+        TypedSerialization typed = (TypedSerialization) keySerialization;
+        result = typed.getSpecificType();
+      }
+      return result == null ? Object.class : result;
+    }
 
-    /** Returns the class of values in this file. */
-    public Class getValueClass() { return valClass; }
+    
+    /** Returns the class of values in this file. Only works for
+     * if a TypedSerialization is used, otherwise Object is returned.  
+     * @deprecated Use {@link #getValueSerialization} instead.
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
+    public Class getValueClass() { 
+      Class result = null;
+      if (valueSerialization instanceof TypedSerialization<?>) {
+        TypedSerialization typed = (TypedSerialization) valueSerialization;
+        result = typed.getSpecificType();
+      }
+      return result == null ? Object.class : result;
+    }
+
+    /**
+     * Return the serialization that is used to serialize the keys.
+     * @return the key serialization
+     */
+    public Serialization<?> getKeySerialization() {
+      return keySerialization;
+    }
+    
+    /**
+     * Return the serialization that is used to serialize the values.
+     * @return the value serialization
+     */
+    public Serialization<?> getValueSerialization() {
+      return valueSerialization;
+    }
 
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
@@ -1091,12 +1229,6 @@ public class SequenceFile {
     
     /** Close the file. */
     public synchronized void close() throws IOException {
-      keySerializer.close();
-      uncompressedValSerializer.close();
-      if (compressedValSerializer != null) {
-        compressedValSerializer.close();
-      }
-
       CodecPool.returnCompressor(compressor);
       compressor = null;
       
@@ -1119,27 +1251,20 @@ public class SequenceFile {
       }
     }
 
-    /** Append a key/value pair. */
-    public void append(Writable key, Writable val)
-      throws IOException {
+    /** Append a key/value pair. 
+     */
+    public void append(Writable key, Writable val) throws IOException {
       append((Object) key, (Object) val);
     }
 
     /** Append a key/value pair. */
-    @SuppressWarnings("unchecked")
     public synchronized void append(Object key, Object val)
       throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+key.getClass().getName()
-                              +" is not "+keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: "+val.getClass().getName()
-                              +" is not "+valClass);
 
       buffer.reset();
 
       // Append the 'key'
-      keySerializer.serialize(key);
+      keySerialization.serialize(buffer, key);
       int keyLength = buffer.getLength();
       if (keyLength < 0)
         throw new IOException("negative length keys not allowed: " + key);
@@ -1147,11 +1272,11 @@ public class SequenceFile {
       // Append the 'value'
       if (compress == CompressionType.RECORD) {
         deflateFilter.resetState();
-        compressedValSerializer.serialize(val);
+        valueSerialization.serialize(deflateFilter, val);
         deflateOut.flush();
         deflateFilter.finish();
       } else {
-        uncompressedValSerializer.serialize(val);
+        valueSerialization.serialize(buffer, val);
       }
 
       // Write the record out
@@ -1200,27 +1325,18 @@ public class SequenceFile {
     }
 
     /** Append a key/value pair. */
-    @SuppressWarnings("unchecked")
-    public synchronized void append(Object key, Object val)
-      throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+key.getClass().getName()
-                              +" is not "+keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: "+val.getClass().getName()
-                              +" is not "+valClass);
-
-      buffer.reset();
+    public synchronized void append(Object key, Object val) throws IOException {
 
       // Append the 'key'
-      keySerializer.serialize(key);
+      buffer.reset();
+      keySerialization.serialize(buffer, key);
       int keyLength = buffer.getLength();
       if (keyLength < 0)
         throw new IOException("negative length keys not allowed: " + key);
 
       // Compress 'value' and append it
       deflateFilter.resetState();
-      compressedValSerializer.serialize(val);
+      valueSerialization.serialize(deflateFilter, val);
       deflateOut.flush();
       deflateFilter.finish();
 
@@ -1267,10 +1383,6 @@ public class SequenceFile {
       super(conf, options);
       compressionBlockSize = 
         conf.getInt("io.seqfile.compress.blocksize", 1000000);
-      keySerializer.close();
-      keySerializer.open(keyBuffer);
-      uncompressedValSerializer.close();
-      uncompressedValSerializer.open(valBuffer);
     }
 
     /** Workhorse to check and write out compressed data/lengths */
@@ -1326,24 +1438,18 @@ public class SequenceFile {
     }
 
     /** Append a key/value pair. */
-    @SuppressWarnings("unchecked")
-    public synchronized void append(Object key, Object val)
-      throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+key+" is not "+keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: "+val+" is not "+valClass);
+    public synchronized void append(Object key, Object val) throws IOException {
 
       // Save key/value into respective buffers 
       int oldKeyLength = keyBuffer.getLength();
-      keySerializer.serialize(key);
+      keySerialization.serialize(keyBuffer, key);
       int keyLength = keyBuffer.getLength() - oldKeyLength;
       if (keyLength < 0)
         throw new IOException("negative length keys not allowed: " + key);
       WritableUtils.writeVInt(keyLenBuffer, keyLength);
 
       int oldValLength = valBuffer.getLength();
-      uncompressedValSerializer.serialize(val);
+      valueSerialization.serialize(valBuffer, val);
       int valLength = valBuffer.getLength() - oldValLength;
       WritableUtils.writeVInt(valLenBuffer, valLength);
       
@@ -1393,15 +1499,9 @@ public class SequenceFile {
   public static class Reader implements java.io.Closeable {
     private String filename;
     private FSDataInputStream in;
-    private DataOutputBuffer outBuf = new DataOutputBuffer();
 
     private byte version;
 
-    private String keyClassName;
-    private String valClassName;
-    private Class keyClass;
-    private Class valClass;
-
     private CompressionCodec codec = null;
     private Metadata metadata = null;
     
@@ -1411,8 +1511,6 @@ public class SequenceFile {
 
     private long headerEnd;
     private long end;
-    private int keyLength;
-    private int recordLength;
 
     private boolean decompress;
     private boolean blockCompressed;
@@ -1420,17 +1518,12 @@ public class SequenceFile {
     private Configuration conf;
 
     private int noBufferedRecords = 0;
-    private boolean lazyDecompress = true;
-    private boolean valuesDecompressed = true;
-    
-    private int noBufferedKeys = 0;
-    private int noBufferedValues = 0;
     
     private DataInputBuffer keyLenBuffer = null;
     private CompressionInputStream keyLenInFilter = null;
     private DataInputStream keyLenIn = null;
     private Decompressor keyLenDecompressor = null;
-    private DataInputBuffer keyBuffer = null;
+    private DataInputBuffer keyBlockBuffer = null;
     private CompressionInputStream keyInFilter = null;
     private DataInputStream keyIn = null;
     private Decompressor keyDecompressor = null;
@@ -1444,8 +1537,13 @@ public class SequenceFile {
     private DataInputStream valIn = null;
     private Decompressor valDecompressor = null;
     
-    private Deserializer keyDeserializer;
-    private Deserializer valDeserializer;
+    // used for object serialization
+    private DataOutputBuffer keyBuffer;
+    private MutableValueBytes valueBytes;
+    private DataInputBuffer serialBuffer;
+
+    private Serialization<Object> keySerialization;
+    private Serialization<Object> valueSerialization;
 
     /**
      * A tag interface for all of the Reader options
@@ -1471,6 +1569,24 @@ public class SequenceFile {
     }
     
     /**
+     * Create an option to specify the required key serialization.
+     * @param value the serialization to deserialize the key with
+     * @return a new option
+     */
+    public static Option keySerialization(Serialization<?> value) {
+      return new KeySerializationOption(value);
+    }
+    
+    /**
+     * Create an option to specify the required value serialization.
+     * @param value the serialization to deserialize the value with
+     * @return a new option
+     */
+    public static Option valueSerialization(Serialization<?> value) {
+      return new ValueSerializationOption(value);
+    }
+
+    /**
      * Create an option to specify the starting byte to read.
      * @param value the number of bytes to skip over
      * @return a new option
@@ -1541,6 +1657,22 @@ public class SequenceFile {
       }
     }
 
+    private static class KeySerializationOption 
+                         extends Options.SerializationOption
+                         implements Option {
+      private KeySerializationOption(Serialization<?> value) {
+        super(value);
+      }
+    }
+
+    private static class ValueSerializationOption 
+                         extends Options.SerializationOption
+                         implements Option {
+      private ValueSerializationOption(Serialization<?> value) {
+        super(value);
+      }
+    }
+
     public Reader(Configuration conf, Option... opts) throws IOException {
       // Look up the options, these are null if not set
       FileOption fileOpt = Options.getOption(FileOption.class, opts);
@@ -1551,6 +1683,11 @@ public class SequenceFile {
       BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
       OnlyHeaderOption headerOnly = 
         Options.getOption(OnlyHeaderOption.class, opts);
+      KeySerializationOption keyOpt = 
+        Options.getOption(KeySerializationOption.class, opts);
+      ValueSerializationOption valueOpt = 
+        Options.getOption(ValueSerializationOption.class, opts);
+
       // check for consistency
       if ((fileOpt == null) == (streamOpt == null)) {
         throw new 
@@ -1560,6 +1697,7 @@ public class SequenceFile {
         throw new IllegalArgumentException("buffer size can only be set when" +
                                            " a file is specified.");
       }
+
       // figure out the real values
       Path filename = null;
       FSDataInputStream file;
@@ -1577,8 +1715,12 @@ public class SequenceFile {
         file = streamOpt.getValue();
       }
       long start = startOpt == null ? 0 : startOpt.getValue();
+
       // really set up
-      initialize(filename, file, start, len, conf, headerOnly != null);
+      initialize(filename, file, start, len, conf, 
+                 (keyOpt == null ? null : keyOpt.getValue()),
+                 (valueOpt == null ? null : valueOpt.getValue()),
+                 headerOnly != null);
     }
 
     /**
@@ -1614,6 +1756,8 @@ public class SequenceFile {
     /** Common work of the constructors. */
     private void initialize(Path filename, FSDataInputStream in,
                             long start, long length, Configuration conf,
+                            Serialization<?> keySerialization,
+                            Serialization<?> valueSerialization,
                             boolean tempReader) throws IOException {
       if (in == null) {
         throw new IllegalArgumentException("in == null");
@@ -1625,12 +1769,11 @@ public class SequenceFile {
       try {
         seek(start);
         this.end = this.in.getPos() + length;
-        System.out.println("Setting end to " + end);
         // if it wrapped around, use the max
         if (end < length) {
           end = Long.MAX_VALUE;
         }
-        init(tempReader);
+        init(tempReader, keySerialization, valueSerialization);
         succeeded = true;
       } finally {
         if (!succeeded) {
@@ -1654,7 +1797,34 @@ public class SequenceFile {
         int bufferSize, long length) throws IOException {
       return fs.open(file, bufferSize);
     }
-    
+
+    @SuppressWarnings("unchecked")
+    private 
+    Serialization<Object> readSerialization(SerializationFactory factory,
+                                            Serialization<?> override
+                                            ) throws IOException {
+      String serializationName = Text.readString(in);
+      Serialization<?> result;
+      if (override == null) {
+        result = factory.getSerialization(serializationName);
+      } else {
+        if (!serializationName.equals(override.getName())) {
+          throw new IllegalArgumentException("using serialization " +
+                                             override.getName() +
+                                             " instead of " + 
+                                             serializationName);
+        }
+        result = override;
+      }
+      int keySerialLength = WritableUtils.readVInt(in);
+      DataInputBuffer buffer = new DataInputBuffer();
+      byte[] bytes = new byte[keySerialLength];
+      in.readFully(bytes);
+      buffer.reset(bytes, keySerialLength);
+      result.deserializeSelf(buffer, conf);
+      return (Serialization<Object>) result;
+    }
+
     /**
      * Initialize the {@link Reader}
      * @param tmpReader <code>true</code> if we are constructing a temporary
@@ -1663,7 +1833,10 @@ public class SequenceFile {
      *                  <code>false</code> otherwise.
      * @throws IOException
      */
-    private void init(boolean tempReader) throws IOException {
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void init(boolean tempReader,
+                      Serialization keySerialization,
+                      Serialization valueSerialization) throws IOException {
       byte[] versionBlock = new byte[VERSION.length];
       in.readFully(versionBlock);
 
@@ -1677,17 +1850,39 @@ public class SequenceFile {
       if (version > VERSION[3])
         throw new VersionMismatchException(VERSION[3], version);
 
-      if (version < BLOCK_COMPRESS_VERSION) {
-        UTF8 className = new UTF8();
+      SerializationFactory factory = SerializationFactory.getInstance(conf);
+      if (version < SERIALIZATION_VERSION) {
+        String keyClassName;
+        String valueClassName;
+        if (version < BLOCK_COMPRESS_VERSION) {
+          UTF8 className = new UTF8();
 
-        className.readFields(in);
-        keyClassName = className.toString(); // key class name
+          className.readFields(in);
+          keyClassName = className.toString(); // key class name
 
-        className.readFields(in);
-        valClassName = className.toString(); // val class name
+          className.readFields(in);
+          valueClassName = className.toString(); // val class name
+        } else {
+          keyClassName = Text.readString(in);
+          valueClassName = Text.readString(in);
+        }
+        try {
+          this.keySerialization = (Serialization<Object>)
+            factory.getSerializationByType(conf.getClassByName(keyClassName));
+        } catch (ClassNotFoundException cnf) {
+          throw new RuntimeException("key class " + keyClassName +
+                                     " not found");
+        }
+        try {
+          this.valueSerialization = (Serialization<Object>)
+            factory.getSerializationByType(conf.getClassByName(valueClassName));
+        } catch (ClassNotFoundException cnf) {
+          throw new RuntimeException("value class " + valueClassName +
+                                     " not found");
+        }
       } else {
-        keyClassName = Text.readString(in);
-        valClassName = Text.readString(in);
+        this.keySerialization = readSerialization(factory, keySerialization);
+        this.valueSerialization = readSerialization(factory,valueSerialization);
       }
 
       if (version > 2) {                          // if version > 2
@@ -1733,6 +1928,8 @@ public class SequenceFile {
       
       // Initialize... *not* if this we are constructing a temporary Reader
       if (!tempReader) {
+        keyBuffer = new DataOutputBuffer();
+        serialBuffer = new DataInputBuffer();
         valBuffer = new DataInputBuffer();
         if (decompress) {
           valDecompressor = CodecPool.getDecompressor(codec);
@@ -1744,7 +1941,7 @@ public class SequenceFile {
 
         if (blockCompressed) {
           keyLenBuffer = new DataInputBuffer();
-          keyBuffer = new DataInputBuffer();
+          keyBlockBuffer = new DataInputBuffer();
           valLenBuffer = new DataInputBuffer();
 
           keyLenDecompressor = CodecPool.getDecompressor(codec);
@@ -1753,7 +1950,8 @@ public class SequenceFile {
           keyLenIn = new DataInputStream(keyLenInFilter);
 
           keyDecompressor = CodecPool.getDecompressor(codec);
-          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
+          keyInFilter = codec.createInputStream(keyBlockBuffer, 
+                                                keyDecompressor);
           keyIn = new DataInputStream(keyInFilter);
 
           valLenDecompressor = CodecPool.getDecompressor(codec);
@@ -1761,27 +1959,10 @@ public class SequenceFile {
                                                    valLenDecompressor);
           valLenIn = new DataInputStream(valLenInFilter);
         }
-        
-        SerializationFactory serializationFactory =
-          new SerializationFactory(conf);
-        this.keyDeserializer =
-          getDeserializer(serializationFactory, getKeyClass());
-        if (!blockCompressed) {
-          this.keyDeserializer.open(valBuffer);
-        } else {
-          this.keyDeserializer.open(keyIn);
-        }
-        this.valDeserializer =
-          getDeserializer(serializationFactory, getValueClass());
-        this.valDeserializer.open(valIn);
+        valueBytes = (MutableValueBytes) createValueBytes();
       }
     }
     
-    @SuppressWarnings("unchecked")
-    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
-      return sf.getDeserializer(c);
-    }
-    
     /** Close the file. */
     public synchronized void close() throws IOException {
       // Return the decompressors to the pool
@@ -1792,49 +1973,80 @@ public class SequenceFile {
       keyLenDecompressor = keyDecompressor = null;
       valLenDecompressor = valDecompressor = null;
       
-      if (keyDeserializer != null) {
-    	keyDeserializer.close();
-      }
-      if (valDeserializer != null) {
-        valDeserializer.close();
-      }
-      
       // Close the input-stream
       in.close();
     }
 
-    /** Returns the name of the key class. */
+    /**
+     * Return the name of the key class. It only works for
+     * TypedSerializations and otherwise returns Object.
+     * @return the key class name
+     * @deprecated Use {@link #getKeySerialization()} instead.
+     */
+    @Deprecated
     public String getKeyClassName() {
-      return keyClassName;
+      return getKeyClass().getName();
     }
 
-    /** Returns the class of keys in this file. */
+    /**
+     * Get the class of the keys in this file. It only works for
+     * TypedSerializations and otherwise returns Object.
+     * @return the class of the keys
+     * @deprecated Use {@link #getKeySerialization()} instead.
+     */
+    @Deprecated
+    @SuppressWarnings("unchecked")
     public synchronized Class<?> getKeyClass() {
-      if (null == keyClass) {
-        try {
-          keyClass = WritableName.getClass(getKeyClassName(), conf);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+      Class result = null;
+      if (keySerialization instanceof TypedSerialization) {
+        TypedSerialization typed = (TypedSerialization) keySerialization;
+        result = typed.getSpecificType();
       }
-      return keyClass;
+      return result == null ? Object.class : result;
     }
 
-    /** Returns the name of the value class. */
+    /**
+     * Return the name of the value class. It only works for
+     * TypedSerializations and otherwise returns Object.
+     * @return the value class name
+     * @deprecated Use {@link #getValueSerialization()} instead.
+     */
+    @Deprecated
     public String getValueClassName() {
-      return valClassName;
+      return getValueClass().getName();
     }
 
-    /** Returns the class of values in this file. */
+    /**
+     * Get the class of the values in this file. It only works for
+     * TypedSerializations and otherwise returns Object.
+     * @return the class of the values
+     * @deprecated Use {@link #getValueSerialization()} instead.
+     */
+    @Deprecated
+    @SuppressWarnings("unchecked")
     public synchronized Class<?> getValueClass() {
-      if (null == valClass) {
-        try {
-          valClass = WritableName.getClass(getValueClassName(), conf);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+      Class result = null;
+      if (valueSerialization instanceof TypedSerialization) {
+        TypedSerialization typed = (TypedSerialization) valueSerialization;
+        result = typed.getSpecificType();
       }
-      return valClass;
+      return result == null ? Object.class : result;
+    }
+
+    /**
+     * Get the serialization for the key.
+     * @return the key serialization
+     */
+    public Serialization<?> getKeySerialization() {
+      return keySerialization;
+    }
+
+    /**
+     * Get the serialization for the value.
+     * @return the value serialization
+     */
+    public Serialization<?> getValueSerialization() {
+      return valueSerialization;
     }
 
     /** Returns true if values are compressed. */
@@ -1888,16 +2100,9 @@ public class SequenceFile {
     
     /** Read the next 'compressed' block */
     private synchronized void readBlock() throws IOException {
-      // Check if we need to throw away a whole block of 
-      // 'values' due to 'lazy decompression' 
-      if (lazyDecompress && !valuesDecompressed) {
-        in.seek(WritableUtils.readVInt(in)+in.getPos());
-        in.seek(WritableUtils.readVInt(in)+in.getPos());
-      }
       
       // Reset internal states
-      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
-      valuesDecompressed = false;
+      noBufferedRecords = 0;
 
       //Process sync
       if (sync != null) {
@@ -1913,55 +2118,11 @@ public class SequenceFile {
       
       // Read key lengths and keys
       readBuffer(keyLenBuffer, keyLenInFilter);
-      readBuffer(keyBuffer, keyInFilter);
-      noBufferedKeys = noBufferedRecords;
+      readBuffer(keyBlockBuffer, keyInFilter);
       
       // Read value lengths and values
-      if (!lazyDecompress) {
-        readBuffer(valLenBuffer, valLenInFilter);
-        readBuffer(valBuffer, valInFilter);
-        noBufferedValues = noBufferedRecords;
-        valuesDecompressed = true;
-      }
-    }
-
-    /** 
-     * Position valLenIn/valIn to the 'value' 
-     * corresponding to the 'current' key 
-     */
-    private synchronized void seekToCurrentValue() throws IOException {
-      if (!blockCompressed) {
-        if (decompress) {
-          valInFilter.resetState();
-        }
-        valBuffer.reset();
-      } else {
-        // Check if this is the first value in the 'block' to be read
-        if (lazyDecompress && !valuesDecompressed) {
-          // Read the value lengths and values
-          readBuffer(valLenBuffer, valLenInFilter);
-          readBuffer(valBuffer, valInFilter);
-          noBufferedValues = noBufferedRecords;
-          valuesDecompressed = true;
-        }
-        
-        // Calculate the no. of bytes to skip
-        // Note: 'current' key has already been read!
-        int skipValBytes = 0;
-        int currentKey = noBufferedKeys + 1;          
-        for (int i=noBufferedValues; i > currentKey; --i) {
-          skipValBytes += WritableUtils.readVInt(valLenIn);
-          --noBufferedValues;
-        }
-        
-        // Skip to the 'val' corresponding to 'current' key
-        if (skipValBytes > 0) {
-          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
-            throw new IOException("Failed to seek to " + currentKey + 
-                                  "(th) value!");
-          }
-        }
-      }
+      readBuffer(valLenBuffer, valLenInFilter);
+      readBuffer(valBuffer, valInFilter);
     }
 
     /**
@@ -1969,148 +2130,27 @@ public class SequenceFile {
      * @param val : The 'value' to be read.
      * @throws IOException
      */
-    public synchronized void getCurrentValue(Writable val) 
-      throws IOException {
-      if (val instanceof Configurable) {
-        ((Configurable) val).setConf(this.conf);
-      }
-
-      // Position stream to 'current' value
-      seekToCurrentValue();
-
-      if (!blockCompressed) {
-        val.readFields(valIn);
-        
-        if (valIn.read() > 0) {
-          LOG.info("available bytes: " + valIn.available());
-          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
-                                + " bytes, should read " +
-                                (valBuffer.getLength()-keyLength));
-        }
-      } else {
-        // Get the value
-        int valLength = WritableUtils.readVInt(valLenIn);
-        val.readFields(valIn);
-        
-        // Read another compressed 'value'
-        --noBufferedValues;
-        
-        // Sanity check
-        if ((valLength < 0) && LOG.isDebugEnabled()) {
-          LOG.debug(val + " is a zero-length value");
-        }
-      }
-
-    }
-    
-    /**
-     * Get the 'value' corresponding to the last read 'key'.
-     * @param val : The 'value' to be read.
-     * @throws IOException
-     */
-    public synchronized Object getCurrentValue(Object val) 
-      throws IOException {
-      if (val instanceof Configurable) {
-        ((Configurable) val).setConf(this.conf);
-      }
-
-      // Position stream to 'current' value
-      seekToCurrentValue();
-
-      if (!blockCompressed) {
-        val = deserializeValue(val);
-        
-        if (valIn.read() > 0) {
-          LOG.info("available bytes: " + valIn.available());
-          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
-                                + " bytes, should read " +
-                                (valBuffer.getLength()-keyLength));
-        }
-      } else {
-        // Get the value
-        int valLength = WritableUtils.readVInt(valLenIn);
-        val = deserializeValue(val);
-        
-        // Read another compressed 'value'
-        --noBufferedValues;
-        
-        // Sanity check
-        if ((valLength < 0) && LOG.isDebugEnabled()) {
-          LOG.debug(val + " is a zero-length value");
-        }
-      }
-      return val;
-
-    }
-
-    @SuppressWarnings("unchecked")
-    private Object deserializeValue(Object val) throws IOException {
-      return valDeserializer.deserialize(val);
-    }
-    
-    /** Read the next key in the file into <code>key</code>, skipping its
-     * value.  True if another entry exists, and false at end of file. */
-    public synchronized boolean next(Writable key) throws IOException {
-      if (key.getClass() != getKeyClass())
-        throw new IOException("wrong key class: "+key.getClass().getName()
-                              +" is not "+keyClass);
-
-      if (!blockCompressed) {
-        outBuf.reset();
-        
-        keyLength = next(outBuf);
-        if (keyLength < 0)
-          return false;
-        
-        valBuffer.reset(outBuf.getData(), outBuf.getLength());
-        
-        key.readFields(valBuffer);
-        valBuffer.mark(0);
-        if (valBuffer.getPosition() != keyLength)
-          throw new IOException(key + " read " + valBuffer.getPosition()
-                                + " bytes, should read " + keyLength);
-      } else {
-        //Reset syncSeen
-        syncSeen = false;
-        
-        if (noBufferedKeys == 0) {
-          try {
-            readBlock();
-          } catch (EOFException eof) {
-            return false;
-          }
-        }
-        
-        int keyLength = WritableUtils.readVInt(keyLenIn);
-        
-        // Sanity check
-        if (keyLength < 0) {
-          return false;
-        }
-        
-        //Read another compressed 'key'
-        key.readFields(keyIn);
-        --noBufferedKeys;
-      }
-
-      return true;
+    public synchronized Object getCurrentValue(Object val) throws IOException {
+      return valueSerialization.deserialize(readUncompressedBytes(valueBytes), 
+                                            val, conf);
     }
 
     /** Read the next key/value pair in the file into <code>key</code> and
      * <code>val</code>.  Returns true if such a pair exists and false when at
-     * end of file */
-    public synchronized boolean next(Writable key, Writable val)
-      throws IOException {
-      if (val.getClass() != getValueClass())
-        throw new IOException("wrong value class: "+val+" is not "+valClass);
+     * end of file
+     * @deprecated Use {@link #next(Object)} and 
+     *     {@link #getCurrentValue(Object)} to iterate through keys and values.
+     */
+    @Deprecated
+    public synchronized boolean next(Writable key,
+                                     Writable val) throws IOException {
 
-      boolean more = next(key);
-      
-      if (more) {
+      if (nextKey(key) == null) {
+        return false;
+      } else {
         getCurrentValue(val);
+        return true;
       }
-
-      return more;
     }
     
     /**
@@ -2141,32 +2181,6 @@ public class SequenceFile {
       return length;
     }
     
-    /** Read the next key/value pair in the file into <code>buffer</code>.
-     * Returns the length of the key read, or -1 if at end of file.  The length
-     * of the value may be computed by calling buffer.getLength() before and
-     * after calls to this method. */
-    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
-    @Deprecated
-    synchronized int next(DataOutputBuffer buffer) throws IOException {
-      // Unsupported for block-compressed sequence files
-      if (blockCompressed) {
-        throw new IOException("Unsupported call for block-compressed" +
-                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
-      }
-      try {
-        int length = readRecordLength();
-        if (length == -1) {
-          return -1;
-        }
-        int keyLength = in.readInt();
-        buffer.write(in, length);
-        return keyLength;
-      } catch (ChecksumException e) {             // checksum failure
-        handleChecksumException(e);
-        return next(buffer);
-      }
-    }
-
     public ValueBytes createValueBytes() {
       ValueBytes val = null;
       if (!decompress || blockCompressed) {
@@ -2178,14 +2192,15 @@ public class SequenceFile {
     }
 
     /**
-     * Read 'raw' records.
+     * Read 'raw' records. Doesn't reset the key buffer. The new key appends
+     * on to the current contents.
      * @param key - The buffer into which the key is read
-     * @param val - The 'raw' value
+     * @param value - The 'raw' value
      * @return Returns the total record length or -1 for end of file
      * @throws IOException
      */
-    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
-      throws IOException {
+    public synchronized int nextRaw(DataOutputBuffer key, 
+                                    ValueBytes value) throws IOException {
       if (!blockCompressed) {
         int length = readRecordLength();
         if (length == -1) {
@@ -2194,13 +2209,7 @@ public class SequenceFile {
         int keyLength = in.readInt();
         int valLength = length - keyLength;
         key.write(in, keyLength);
-        if (decompress) {
-          CompressedBytes value = (CompressedBytes)val;
-          value.reset(in, valLength);
-        } else {
-          UncompressedBytes value = (UncompressedBytes)val;
-          value.reset(in, valLength);
-        }
+        ((MutableValueBytes) value).reset(in, valLength);
         
         return length;
       } else {
@@ -2208,29 +2217,23 @@ public class SequenceFile {
         syncSeen = false;
         
         // Read 'key'
-        if (noBufferedKeys == 0) {
+        if (noBufferedRecords == 0) {
           if (in.getPos() >= end) 
             return -1;
 
-          try { 
-            readBlock();
-          } catch (EOFException eof) {
-            return -1;
-          }
+          readBlock();
         }
         int keyLength = WritableUtils.readVInt(keyLenIn);
         if (keyLength < 0) {
           throw new IOException("zero length key found!");
         }
         key.write(keyIn, keyLength);
-        --noBufferedKeys;
+        --noBufferedRecords;
         
         // Read raw 'value'
-        seekToCurrentValue();
         int valLength = WritableUtils.readVInt(valLenIn);
-        UncompressedBytes rawValue = (UncompressedBytes)val;
+        UncompressedBytes rawValue = (UncompressedBytes)value;
         rawValue.reset(valIn, valLength);
-        --noBufferedValues;
         
         return (keyLength+valLength);
       }
@@ -2243,95 +2246,47 @@ public class SequenceFile {
      * @return Returns the key length or -1 for end of file
      * @throws IOException
      */
-    public synchronized int nextRawKey(DataOutputBuffer key) 
-      throws IOException {
-      if (!blockCompressed) {
-        recordLength = readRecordLength();
-        if (recordLength == -1) {
-          return -1;
-        }
-        keyLength = in.readInt();
-        key.write(in, keyLength);
-        return keyLength;
-      } else {
-        //Reset syncSeen
-        syncSeen = false;
-        
-        // Read 'key'
-        if (noBufferedKeys == 0) {
-          if (in.getPos() >= end) 
-            return -1;
-
-          try { 
-            readBlock();
-          } catch (EOFException eof) {
-            return -1;
-          }
-        }
-        int keyLength = WritableUtils.readVInt(keyLenIn);
-        if (keyLength < 0) {
-          throw new IOException("zero length key found!");
-        }
-        key.write(keyIn, keyLength);
-        --noBufferedKeys;
-        
-        return keyLength;
-      }
-      
+    public synchronized int nextRawKey(DataOutputBuffer key) throws IOException{
+      key.reset();
+      return nextRaw(key, valueBytes);
     }
 
-    /** Read the next key in the file, skipping its
-     * value.  Return null at end of file. */
-    public synchronized Object next(Object key) throws IOException {
-      if (key != null && key.getClass() != getKeyClass()) {
-        throw new IOException("wrong key class: "+key.getClass().getName()
-                              +" is not "+keyClass);
-      }
-
-      if (!blockCompressed) {
-        outBuf.reset();
-        
-        keyLength = next(outBuf);
-        if (keyLength < 0)
-          return null;
-        
-        valBuffer.reset(outBuf.getData(), outBuf.getLength());
-        
-        key = deserializeKey(key);
-        valBuffer.mark(0);
-        if (valBuffer.getPosition() != keyLength)
-          throw new IOException(key + " read " + valBuffer.getPosition()
-                                + " bytes, should read " + keyLength);
-      } else {
-        //Reset syncSeen
-        syncSeen = false;
-        
-        if (noBufferedKeys == 0) {
-          try {
-            readBlock();
-          } catch (EOFException eof) {
-            return null;
-          }
-        }
-        
-        int keyLength = WritableUtils.readVInt(keyLenIn);
-        
-        // Sanity check
-        if (keyLength < 0) {
-          return null;
-        }
-        
-        //Read another compressed 'key'
-        key = deserializeKey(key);
-        --noBufferedKeys;
-      }
+    /**
+     * Read the next key in the file.
+     * The value is available via {@link #getCurrentValue}.
+     * @param key if not null, may be used to hold the next key
+     * @return true if a key was read, false if eof
+     * @throws IOException
+     * @deprecated Use {@link #nextKey} instead.
+     */
+    @Deprecated
+    public boolean next(Writable key) throws IOException {
+      return nextKey(key) != null;
+    }
 
-      return key;
+    /**
+     * Read the next key from the file.
+     * @param key if not null, may be used to hold the next key
+     * @return the key that was read
+     * @throws IOException
+     * @deprecated Use {@link #nextKey} instead.
+     */
+    @Deprecated
+    public Object next(Object key) throws IOException {
+      return nextKey(key);
     }
 
-    @SuppressWarnings("unchecked")
-    private Object deserializeKey(Object key) throws IOException {
-      return keyDeserializer.deserialize(key);
+    /** Read the next key in the file. 
+     * The value is available via {@link #getCurrentValue}.
+     */
+    public synchronized Object nextKey(Object key) throws IOException {
+      keyBuffer.reset();
+      int recordLen = nextRaw(keyBuffer, valueBytes);
+      if (recordLen < 0) {
+        return null;
+      }
+      serialBuffer.reset(keyBuffer.getData(), keyBuffer.getLength());
+      return keySerialization.deserialize(serialBuffer, key, conf);
     }
 
     /**
@@ -2340,31 +2295,9 @@ public class SequenceFile {
      * @return Returns the value length
      * @throws IOException
      */
-    public synchronized int nextRawValue(ValueBytes val) 
-      throws IOException {
-      
-      // Position stream to current value
-      seekToCurrentValue();
- 
-      if (!blockCompressed) {
-        int valLength = recordLength - keyLength;
-        if (decompress) {
-          CompressedBytes value = (CompressedBytes)val;
-          value.reset(in, valLength);
-        } else {
-          UncompressedBytes value = (UncompressedBytes)val;
-          value.reset(in, valLength);
-        }
-         
-        return valLength;
-      } else {
-        int valLength = WritableUtils.readVInt(valLenIn);
-        UncompressedBytes rawValue = (UncompressedBytes)val;
-        rawValue.reset(valIn, valLength);
-        --noBufferedValues;
-        return valLength;
-      }
-      
+    public synchronized int nextRawValue(ValueBytes val) throws IOException {
+      ((MutableValueBytes) val).set(valueBytes);
+      return val.getSize();
     }
 
     private void handleChecksumException(ChecksumException e)
@@ -2391,8 +2324,7 @@ public class SequenceFile {
     public synchronized void seek(long position) throws IOException {
       in.seek(position);
       if (blockCompressed) {                      // trigger block read
-        noBufferedKeys = 0;
-        valuesDecompressed = true;
+        noBufferedRecords = 0;
       }
     }
 
@@ -2447,58 +2379,172 @@ public class SequenceFile {
 
   }
 
-  /** Sorts key/value pairs in a sequence-format file.
+  /** Sorts key/value pairs in a sequence-format file. This class is no longer
+   * used by Hadoop and will be removed in a later release.
    *
-   * <p>For best performance, applications should make sure that the {@link
-   * Writable#readFields(DataInput)} implementation of their keys is
-   * very efficient.  In particular, it should avoid allocating memory.
+   * <p>For best performance, applications should make sure that the 
+   * {@link RawComparator} that is used is efficient.
    */
+  @Deprecated
   public static class Sorter {
 
-    private RawComparator comparator;
+    private final RawComparator comparator;
+    private Writer.Option[] options;
+    private final Configuration conf;
+    private final FileContext fc;
+    private int memory; // bytes
+    private int factor; // merged per pass
+    private final Serialization<?> keySerialization;
+    private final Serialization<?> valueSerialization;
 
     private MergeSort mergeSort; //the implementation of merge sort
     
     private Path[] inFiles;                     // when merging or sorting
 
     private Path outFile;
+    
+    private CompressionType compressType;
+    private CompressionCodec compressCodec;
+    
+    /**
+     * Look at the first input file's header to figure out the compression for
+     * the output.
+     * @throws IOException
+     */
+    private void setCompressionType() throws IOException {
+      if (inFiles == null || inFiles.length == 0) {
+        return;
+      }
+      Reader reader = new Reader(conf, Reader.file(inFiles[0]),
+                                 new Reader.OnlyHeaderOption());
+      compressType = reader.getCompressionType();
+      compressCodec = reader.getCompressionCodec();
+      reader.close();
+    }
 
-    private int memory; // bytes
-    private int factor; // merged per pass
+    public static interface Option extends Writer.Option { }
+    
+    public static Option comparator(RawComparator value) {
+      return new ComparatorOption(value);
+    }
 
-    private FileSystem fs = null;
+    private static class ComparatorOption extends Options.ComparatorOption 
+                                          implements Option {
+      private ComparatorOption(RawComparator value) {
+        super(value);
+      }
+    }
 
-    private Class keyClass;
-    private Class valClass;
+    /**
+     * Create a Sorter.
+     * @param conf the configuration for the Sorter
+     * @param options the options controlling the sort, in particular the
+     *   comparator that will sort the data and the options to write the
+     *   output SequenceFiles. Since the bytes are not deserialized during the
+     *   sort, the serialization for keys and values of the inputs must match
+     *   the options for writing the SequenceFiles.
+     */
+    public Sorter(Configuration conf, Writer.Option... options ) {
+      this.options = options;
+      this.conf = conf;
+      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
+      this.factor = conf.getInt("io.sort.factor", 100);
+      try {
+        fc = FileContext.getFileContext(conf);
+      } catch (UnsupportedFileSystemException ex) {
+        throw new IllegalArgumentException("can't load default filesystem", ex);
+      }
+      ComparatorOption compareOpt = Options.getOption(ComparatorOption.class, 
+                                                      options);
+      keySerialization = getSerialization(Writer.KeySerialization.class,
+                                          Writer.KeyClassOption.class,
+                                          options);
+      valueSerialization = getSerialization(Writer.ValueSerialization.class,
+                                            Writer.ValueClassOption.class,
+                                            options);
+      if (compareOpt == null) {
+        comparator = keySerialization.getRawComparator();
+      } else {
+        comparator = compareOpt.getValue();
+      }
+    }
 
-    private Configuration conf;
-    private Metadata metadata;
-    
-    private Progressable progressable = null;
+    private 
+    Serialization<?> getSerialization(Class<? extends Writer.Option> serialOpt,
+                                      Class<? extends Writer.Option> classOpt,
+                                      Writer.Option[] options) {
+      Options.SerializationOption serialOption = (Options.SerializationOption)
+        Options.getOption(serialOpt, options);
+      if (serialOption != null) {
+        return serialOption.getValue();
+      } else {
+        Options.ClassOption classOption = (Options.ClassOption)
+          Options.getOption(classOpt, options);
+        if (classOption == null) {
+          throw new IllegalArgumentException("Must specify either a " +
+                                             "serializer, or "
+                                             + "a class");
+        }
+        Class<?> cls = classOption.getValue();
+        return SerializationFactory.getInstance(conf).
+                 getSerializationByType(cls);
+      }      
+    }
 
-    /** Sort and merge files containing the named classes. */
+    /**
+     * Check to ensure the serialization of the input files matches the 
+     * serialization we are using for the output. If they are not, it would
+     * corrupt the outputs since we copy the keys and values as raw bytes.
+     * @param reader the reader for the input file
+     * @param filename the filename of the file
+     * @throws IllegalArgumentException if the serialization is wrong
+     */
+    private void checkSerialization(Reader reader,
+                                    Path filename) {
+      if (!reader.getKeySerialization().equals(keySerialization)) {
+        throw new IllegalArgumentException("key serialization of " +
+                                           filename + 
+                                           " does not match output" +
+                                           " parameters");
+      }
+      if (!reader.getValueSerialization().equals(valueSerialization)) {
+        throw new IllegalArgumentException("value serialization of " +
+                                           filename + 
+                                           " does not match output" +
+                                           " parameters");
+      }
+    }
+
+    /** Sort and merge files containing the named classes. 
+     * @deprecated Use Sorter(Configuration, Option...) instead.
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
     public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
                   Class valClass, Configuration conf)  {
-      this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
+      this(conf, Writer.keyClass(keyClass), Writer.valueClass(valClass));
     }
 
-    /** Sort and merge using an arbitrary {@link RawComparator}. */
+    /** Sort and merge using an arbitrary {@link RawComparator}. 
+     * @deprecated Use Sorter(Configuration, Option...) instead.
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
     public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
                   Class valClass, Configuration conf) {
-      this(fs, comparator, keyClass, valClass, conf, new Metadata());
+      this(conf, comparator(comparator), Writer.keyClass(keyClass),
+           Writer.valueClass(valClass));
     }
 
-    /** Sort and merge using an arbitrary {@link RawComparator}. */
+    /** Sort and merge using an arbitrary {@link RawComparator}. 
+     * @deprecated Use Sorter(Configuration, Option...) instead.
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
     public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
                   Class valClass, Configuration conf, Metadata metadata) {
-      this.fs = fs;
-      this.comparator = comparator;
-      this.keyClass = keyClass;
-      this.valClass = valClass;
-      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
-      this.factor = conf.getInt("io.sort.factor", 100);
-      this.conf = conf;
-      this.metadata = metadata;
+      this(conf, comparator(comparator), Writer.keyClass(keyClass),
+           Writer.valueClass(valClass), Writer.metadata(metadata));
     }
 
     /** Set the number of streams to merge at once.*/
@@ -2513,9 +2559,13 @@ public class SequenceFile {
     /** Get the total amount of buffer memory, in bytes.*/
     public int getMemory() { return memory; }
 
-    /** Set the progressable object in order to report progress. */
+    /** Set the progressable object in order to report progress. 
+     * @deprecated the progressable should be set when the Sorter is created.
+     */
+    @Deprecated
     public void setProgressable(Progressable progressable) {
-      this.progressable = progressable;
+      options = Options.prependOptions(options,
+                                       Writer.progressable(progressable));
     }
     
     /** 
@@ -2526,12 +2576,12 @@ public class SequenceFile {
      */
     public void sort(Path[] inFiles, Path outFile,
                      boolean deleteInput) throws IOException {
-      if (fs.exists(outFile)) {
+      if (fc.util().exists(outFile)) {
         throw new IOException("already exists: " + outFile);
       }
-
       this.inFiles = inFiles;
       this.outFile = outFile;
+      setCompressionType();
 
       int segments = sortPass(deleteInput);
       if (segments > 1) {
@@ -2549,10 +2599,12 @@ public class SequenceFile {
     public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
                                               boolean deleteInput) throws IOException {
       Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
-      if (fs.exists(outFile)) {
+      if (fc.util().exists(outFile)) {
         throw new IOException("already exists: " + outFile);
       }
       this.inFiles = inFiles;
+      setCompressionType();
+
       //outFile will basically be used as prefix for temp files in the cases
       //where sort outputs multiple sorted segments. For the single segment
       //case, the outputFile itself will contain the sorted data for that
@@ -2578,11 +2630,8 @@ public class SequenceFile {
     }
     
     private int sortPass(boolean deleteInput) throws IOException {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("running sort pass");
-      }
+      LOG.debug("running sort pass");
       SortPass sortPass = new SortPass();         // make the SortPass
-      sortPass.setProgressable(progressable);
       mergeSort = new MergeSort(sortPass.new SeqFileComparator());
       try {
         return sortPass.run(deleteInput);         // run it
@@ -2604,30 +2653,22 @@ public class SequenceFile {
       private int[] keyLengths = new int[keyOffsets.length];
       private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
       
-      private ArrayList segmentLengths = new ArrayList();
-      
       private Reader in = null;
       private FSDataOutputStream out = null;
       private FSDataOutputStream indexOut = null;
       private Path outName;
 
-      private Progressable progressable = null;
-
       public int run(boolean deleteInput) throws IOException {
         int segments = 0;
         int currentFile = 0;
         boolean atEof = (currentFile >= inFiles.length);
-        CompressionType compressionType;
-        CompressionCodec codec = null;
-        segmentLengths.clear();
         if (atEof) {
           return 0;
         }
         
         // Initialize
-        in = new Reader(fs, inFiles[currentFile], conf);
-        compressionType = in.getCompressionType();
-        codec = in.getCompressionCodec();
+        in = new Reader(conf, Reader.file(inFiles[currentFile]));
+        checkSerialization(in, inFiles[currentFile]);
         
         for (int i=0; i < rawValues.length; ++i) {
           rawValues[i] = null;
@@ -2642,21 +2683,24 @@ public class SequenceFile {
 
             // Read a record into buffer
             // Note: Attempt to re-use 'rawValue' as far as possible
-            int keyOffset = rawKeys.getLength();       
-            ValueBytes rawValue = 
-              (count == keyOffsets.length || rawValues[count] == null) ? 
-              in.createValueBytes() : 
-              rawValues[count];
+            int keyOffset = rawKeys.getLength();
+            ValueBytes rawValue;
+            if (count == keyOffsets.length || rawValues[count] == null) {
+              rawValue = in.createValueBytes();
+            } else {
+              rawValue = rawValues[count];
+            }
             int recordLength = in.nextRaw(rawKeys, rawValue);
             if (recordLength == -1) {
               in.close();
               if (deleteInput) {
-                fs.delete(inFiles[currentFile], true);
+                fc.delete(inFiles[currentFile], true);
               }
               currentFile += 1;
               atEof = currentFile >= inFiles.length;
               if (!atEof) {
-                in = new Reader(fs, inFiles[currentFile], conf);
+                in = new Reader(conf, Reader.file(inFiles[currentFile]));
+                checkSerialization(in, inFiles[currentFile]);
               } else {
                 in = null;
               }
@@ -2678,17 +2722,10 @@ public class SequenceFile {
           }
 
           // buffer is full -- sort & flush it
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("flushing segment " + segments);
-          }
+          LOG.debug("flushing segment " + segments);
           rawBuffer = rawKeys.getData();
           sort(count);
-          // indicate we're making progress
-          if (progressable != null) {
-            progressable.progress();
-          }
-          flush(count, bytesProcessed, compressionType, codec, 
-                segments==0 && atEof);
+          flush(count, bytesProcessed, segments==0 && atEof);
           segments++;
         }
         return segments;
@@ -2731,22 +2768,23 @@ public class SequenceFile {
       }
 
       private void flush(int count, int bytesProcessed, 
-                         CompressionType compressionType, 
-                         CompressionCodec codec, 
                          boolean done) throws IOException {
         if (out == null) {
           outName = done ? outFile : outFile.suffix(".0");
-          out = fs.create(outName);
+          out = fc.create(outName, EnumSet.of(CreateFlag.CREATE));
           if (!done) {
-            indexOut = fs.create(outName.suffix(".index"));
+            indexOut = fc.create(outName.suffix(".index"),
+                                 EnumSet.of(CreateFlag.CREATE));
           }
         }
 
         long segmentStart = out.getPos();
-        Writer writer = createWriter(conf, Writer.stream(out), 
-            Writer.keyClass(keyClass), Writer.valueClass(valClass),
-            Writer.compression(compressionType, codec),
-            Writer.metadata(done ? metadata : new Metadata()));
+        Writer writer = 
+          createWriter(conf, 
+                       Options.prependOptions(options, 
+                                              Writer.stream(out),
+                                              Writer.compression(compressType,
+                                                                 compressCodec)));
         
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
@@ -2778,12 +2816,6 @@ public class SequenceFile {
         }
       }
       
-      /** set the progressable object in order to report progress */
-      public void setProgressable(Progressable progressable)
-      {
-        this.progressable = progressable;
-      }
-      
     } // SequenceFile.Sorter.SortPass
 
     /** The interface to iterate over raw keys/values of SequenceFiles. */
@@ -2824,7 +2856,7 @@ public class SequenceFile {
                                      Path tmpDir) 
       throws IOException {
       // pass in object to report progress, if present
-      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
+      MergeQueue mQueue = new MergeQueue(segments, tmpDir);
       return mQueue.merge();
     }
 
@@ -2863,13 +2895,13 @@ public class SequenceFile {
       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
       for (int i = 0; i < inNames.length; i++) {
         SegmentDescriptor s = new SegmentDescriptor(0,
-            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
+            fc.getFileStatus(inNames[i]).getLen(), inNames[i]);
         s.preserveInput(!deleteInputs);
         s.doSync();
         a.add(s);
       }
       this.factor = factor;
-      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
+      MergeQueue mQueue = new MergeQueue(a, tmpDir);
       return mQueue.merge();
     }
 
@@ -2892,46 +2924,17 @@ public class SequenceFile {
       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
       for (int i = 0; i < inNames.length; i++) {
         SegmentDescriptor s = new SegmentDescriptor(0,
-            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
+            fc.getFileStatus(inNames[i]).getLen(), inNames[i]);
         s.preserveInput(!deleteInputs);
         s.doSync();
         a.add(s);
       }
       factor = (inNames.length < factor) ? inNames.length : factor;
-      // pass in object to report progress, if present
-      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
+      MergeQueue mQueue = new MergeQueue(a, tempDir);
       return mQueue.merge();
     }
 
     /**
-     * Clones the attributes (like compression of the input file and creates a 
-     * corresponding Writer
-     * @param inputFile the path of the input file whose attributes should be 
-     * cloned
-     * @param outputFile the path of the output file 
-     * @param prog the Progressable to report status during the file write
-     * @return Writer
-     * @throws IOException
-     */
-    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
-                                      Progressable prog) throws IOException {
-      Reader reader = new Reader(conf,
-                                 Reader.file(inputFile),
-                                 new Reader.OnlyHeaderOption());
-      CompressionType compress = reader.getCompressionType();
-      CompressionCodec codec = reader.getCompressionCodec();
-      reader.close();
-
-      Writer writer = createWriter(conf, 
-                                   Writer.file(outputFile), 
-                                   Writer.keyClass(keyClass), 
-                                   Writer.valueClass(valClass), 
-                                   Writer.compression(compress, codec), 
-                                   Writer.progressable(prog));
-      return writer;
-    }
-
-    /**
      * Writes records from RawKeyValueIterator into a file represented by the 
      * passed writer
      * @param records the RawKeyValueIterator
@@ -2953,12 +2956,17 @@ public class SequenceFile {
      * @throws IOException
      */
     public void merge(Path[] inFiles, Path outFile) throws IOException {
-      if (fs.exists(outFile)) {
+      if (fc.util().exists(outFile)) {
         throw new IOException("already exists: " + outFile);
       }
+      this.inFiles = inFiles;
+      setCompressionType();
       RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
-      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
-      
+      Writer writer = 
+        createWriter(conf, Options.prependOptions
+                       (options, 
+                        Writer.file(outFile),
+                        Writer.compression(compressType, compressCodec)));
       writeFile(r, writer);
 
       writer.close();
@@ -2966,11 +2974,11 @@ public class SequenceFile {
 
     /** sort calls this to generate the final merged output */
     private int mergePass(Path tmpDir) throws IOException {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("running merge pass");
-      }
-      Writer writer = cloneFileAttributes(
-                                          outFile.suffix(".0"), outFile, null);
+      LOG.debug("running merge pass");
+      Writer writer = 
+        createWriter(conf, Options.prependOptions
+                       (options, Writer.file(outFile),
+                        Writer.compression(compressType, compressCodec)));
       RawKeyValueIterator r = merge(outFile.suffix(".0"), 
                                     outFile.suffix(".0.index"), tmpDir);
       writeFile(r, writer);
@@ -2994,12 +3002,12 @@ public class SequenceFile {
       //the contained segments during the merge process & hence don't need 
       //them anymore
       SegmentContainer container = new SegmentContainer(inName, indexIn);
-      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
+      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir);
       return mQueue.merge();
     }
     
     /** This class implements the core of the merge logic */
-    private class MergeQueue extends PriorityQueue 
+    private class MergeQueue extends PriorityQueue<SegmentDescriptor>
       implements RawKeyValueIterator {
       private boolean compress;
       private boolean blockCompress;
@@ -3009,7 +3017,6 @@ public class SequenceFile {
       private float progPerByte;
       private Progress mergeProgress = new Progress();
       private Path tmpDir;
-      private Progressable progress = null; //handle to the progress reporting object
       private SegmentDescriptor minSegment;
       
       //a TreeMap used to store the segments sorted by size (segment offset and
@@ -3017,8 +3024,7 @@ public class SequenceFile {
       private Map<SegmentDescriptor, Void> sortedSegmentSizes =
         new TreeMap<SegmentDescriptor, Void>();
             
-      @SuppressWarnings("unchecked")
-      public void put(SegmentDescriptor stream) throws IOException {
+      public void addSegment(SegmentDescriptor stream) throws IOException {
         if (size() == 0) {
           compress = stream.in.isCompressed();
           blockCompress = stream.in.isBlockCompressed();
@@ -3026,29 +3032,23 @@ public class SequenceFile {
                    blockCompress != stream.in.isBlockCompressed()) {
           throw new IOException("All merged files must be compressed or not.");
         } 
-        super.put(stream);
+        put(stream);
       }
       
       /**
        * A queue of file segments to merge
        * @param segments the file segments to merge
        * @param tmpDir a relative local directory to save intermediate files in
-       * @param progress the reference to the Progressable object
        */
       public MergeQueue(List <SegmentDescriptor> segments,
-          Path tmpDir, Progressable progress) {
+                        Path tmpDir) {
         int size = segments.size();
         for (int i = 0; i < size; i++) {
           sortedSegmentSizes.put(segments.get(i), null);
         }
         this.tmpDir = tmpDir;
-        this.progress = progress;
       }
       protected boolean lessThan(Object a, Object b) {
-        // indicate we're making progress
-        if (progress != null) {
-          progress.progress();
-        }
         SegmentDescriptor msa = (SegmentDescriptor)a;
         SegmentDescriptor msb = (SegmentDescriptor)b;
         return comparator.compare(msa.getKey().getData(), 0, 
@@ -3167,7 +3167,7 @@ public class SequenceFile {
           //feed the streams to the priority queue
           initialize(segmentsToMerge.size()); clear();
           for (int i = 0; i < segmentsToMerge.size(); i++) {
-            put(segmentsToMerge.get(i));
+            addSegment(segmentsToMerge.get(i));
           }
           //if we have lesser number of segments remaining, then just return the
           //iterator, else do another single level merge
@@ -3198,12 +3198,14 @@ public class SequenceFile {
             Path outputFile =  lDirAlloc.getLocalPathForWrite(
                                                 tmpFilename.toString(),
                                                 approxOutputSize, conf);
-            if(LOG.isDebugEnabled()) { 
-              LOG.debug("writing intermediate results to " + outputFile);
-            }
-            Writer writer = cloneFileAttributes(
-                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
-                                                fs.makeQualified(outputFile), null);
+            LOG.debug("writing intermediate results to " + outputFile);
+            Writer writer = 
+              createWriter(conf, 
+                           Options.prependOptions
+                              (options, 
+                               Writer.file(outputFile),
+                               Writer.compression(compressType,
+                                                  compressCodec)));
             writer.sync = null; //disable sync for temp files
             writeFile(this, writer);
             writer.close();
@@ -3214,7 +3216,7 @@ public class SequenceFile {
             
             SegmentDescriptor tempSegment = 
               new SegmentDescriptor(0,
-                  fs.getFileStatus(outputFile).getLen(), outputFile);
+                  fc.getFileStatus(outputFile).getLen(), outputFile);
             //put the segment back in the TreeMap
             sortedSegmentSizes.put(tempSegment, null);
             numSegments = sortedSegmentSizes.size();
@@ -3244,7 +3246,8 @@ public class SequenceFile {
           numDescriptors = sortedSegmentSizes.size();
         SegmentDescriptor[] SegmentDescriptors = 
           new SegmentDescriptor[numDescriptors];
-        Iterator iter = sortedSegmentSizes.keySet().iterator();
+        Iterator<SegmentDescriptor> iter = 
+          sortedSegmentSizes.keySet().iterator();
         int i = 0;
         while (i < numDescriptors) {
           SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
@@ -3258,7 +3261,7 @@ public class SequenceFile {
      * provide a customized cleanup method implementation. In this 
      * implementation, cleanup closes the file handle and deletes the file 
      */
-    public class SegmentDescriptor implements Comparable {
+    public class SegmentDescriptor implements Comparable<SegmentDescriptor> {
       
       long segmentOffset; //the start of the segment in the file
       long segmentLength; //the length of the segment
@@ -3292,8 +3295,8 @@ public class SequenceFile {
         return preserveInput;
       }
       
-      public int compareTo(Object o) {
-        SegmentDescriptor that = (SegmentDescriptor)o;
+      @Override
+      public int compareTo(SegmentDescriptor that) {
         if (this.segmentLength != that.segmentLength) {
           return (this.segmentLength < that.segmentLength ? -1 : 1);
         }
@@ -3329,24 +3332,16 @@ public class SequenceFile {
       public boolean nextRawKey() throws IOException {
         if (in == null) {
           int bufferSize = getBufferSize(conf); 
-          if (fs.getUri().getScheme().startsWith("ramfs")) {
-            bufferSize = conf.getInt("io.bytes.per.checksum", 512);
-          }
           Reader reader = new Reader(conf,
                                      Reader.file(segmentPathName), 
                                      Reader.bufferSize(bufferSize),
                                      Reader.start(segmentOffset), 
                                      Reader.length(segmentLength));
+          checkSerialization(reader, segmentPathName);
         
           //sometimes we ignore syncs especially for temp merge files
           if (ignoreSync) reader.ignoreSync();
 
-          if (reader.getKeyClass() != keyClass)
-            throw new IOException("wrong key class: " + reader.getKeyClass() +
-                                  " is not " + keyClass);
-          if (reader.getValueClass() != valClass)
-            throw new IOException("wrong value class: "+reader.getValueClass()+
-                                  " is not " + valClass);
           this.in = reader;
           rawKey = new DataOutputBuffer();
         }
@@ -3384,7 +3379,7 @@ public class SequenceFile {
       public void cleanup() throws IOException {
         close();
         if (!preserveInput) {
-          fs.delete(segmentPathName, true);
+          fc.delete(segmentPathName, true);
         }
       }
     } // SequenceFile.Sorter.SegmentDescriptor
@@ -3439,8 +3434,8 @@ public class SequenceFile {
        * generates a single output file with an associated index file */
       public SegmentContainer(Path inName, Path indexIn) throws IOException {
         //get the segments from indexIn
-        FSDataInputStream fsIndexIn = fs.open(indexIn);
-        long end = fs.getFileStatus(indexIn).getLen();
+        FSDataInputStream fsIndexIn = fc.open(indexIn);
+        long end = fc.getFileStatus(indexIn).getLen();
         while (fsIndexIn.getPos() < end) {
           long segmentOffset = WritableUtils.readVLong(fsIndexIn);
           long segmentLength = WritableUtils.readVLong(fsIndexIn);
@@ -3449,7 +3444,7 @@ public class SequenceFile {
                                                     segmentLength, segmentName, this));
         }
         fsIndexIn.close();
-        fs.delete(indexIn, true);
+        fc.delete(indexIn, true);
         numSegmentsContained = segments.size();
         this.inName = inName;
       }
@@ -3460,7 +3455,7 @@ public class SequenceFile {
       public void cleanup() throws IOException {
         numSegmentsCleanedUp++;

[... 7 lines stripped ...]