You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/27 23:16:43 UTC

[11/22] incubator-geode git commit: GEODE-1072: Removing HDFS related code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java
deleted file mode 100644
index b13f499..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/org/apache/hadoop/io/SequenceFile.java
+++ /dev/null
@@ -1,3726 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io;
-
-import java.io.*;
-import java.util.*;
-import java.rmi.server.UID;
-import java.security.MessageDigest;
-import org.apache.commons.logging.*;
-import org.apache.hadoop.util.Options;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.Options.CreateOpts;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.hadoop.util.MergeSort;
-import org.apache.hadoop.util.PriorityQueue;
-import org.apache.hadoop.util.Time;
-// ** Pivotal Changes Begin
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.VersionMismatchException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableName;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
-//** Pivotal Changes End
-
-/** 
- * <code>SequenceFile</code>s are flat files consisting of binary key/value 
- * pairs.
- * 
- * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
- * {@link Sorter} classes for writing, reading and sorting respectively.</p>
- * 
- * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
- * {@link CompressionType} used to compress key/value pairs:
- * <ol>
- *   <li>
- *   <code>Writer</code> : Uncompressed records.
- *   </li>
- *   <li>
- *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
- *                                       values.
- *   </li>
- *   <li>
- *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
- *                                      values are collected in 'blocks' 
- *                                      separately and compressed. The size of 
- *                                      the 'block' is configurable.
- * </ol>
- * 
- * <p>The actual compression algorithm used to compress key and/or values can be
- * specified by using the appropriate {@link CompressionCodec}.</p>
- * 
- * <p>The recommended way is to use the static <tt>createWriter</tt> methods
- * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
- *
- * <p>The {@link Reader} acts as the bridge and can read any of the above 
- * <code>SequenceFile</code> formats.</p>
- *
- * <h4 id="Formats">SequenceFile Formats</h4>
- * 
- * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
- * depending on the <code>CompressionType</code> specified. All of them share a
- * <a href="#Header">common header</a> described below.
- * 
- * <h5 id="Header">SequenceFile Header</h5>
- * <ul>
- *   <li>
- *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
- *             version number (e.g. SEQ4 or SEQ6)
- *   </li>
- *   <li>
- *   keyClassName -key class
- *   </li>
- *   <li>
- *   valueClassName - value class
- *   </li>
- *   <li>
- *   compression - A boolean which specifies if compression is turned on for 
- *                 keys/values in this file.
- *   </li>
- *   <li>
- *   blockCompression - A boolean which specifies if block-compression is 
- *                      turned on for keys/values in this file.
- *   </li>
- *   <li>
- *   compression codec - <code>CompressionCodec</code> class which is used for  
- *                       compression of keys and/or values (if compression is 
- *                       enabled).
- *   </li>
- *   <li>
- *   metadata - {@link Metadata} for this file.
- *   </li>
- *   <li>
- *   sync - A sync marker to denote end of the header.
- *   </li>
- * </ul>
- * 
- * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
- * <ul>
- * <li>
- * <a href="#Header">Header</a>
- * </li>
- * <li>
- * Record
- *   <ul>
- *     <li>Record length</li>
- *     <li>Key length</li>
- *     <li>Key</li>
- *     <li>Value</li>
- *   </ul>
- * </li>
- * <li>
- * A sync-marker every few <code>100</code> bytes or so.
- * </li>
- * </ul>
- *
- * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
- * <ul>
- * <li>
- * <a href="#Header">Header</a>
- * </li>
- * <li>
- * Record
- *   <ul>
- *     <li>Record length</li>
- *     <li>Key length</li>
- *     <li>Key</li>
- *     <li><i>Compressed</i> Value</li>
- *   </ul>
- * </li>
- * <li>
- * A sync-marker every few <code>100</code> bytes or so.
- * </li>
- * </ul>
- * 
- * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
- * <ul>
- * <li>
- * <a href="#Header">Header</a>
- * </li>
- * <li>
- * Record <i>Block</i>
- *   <ul>
- *     <li>Uncompressed number of records in the block</li>
- *     <li>Compressed key-lengths block-size</li>
- *     <li>Compressed key-lengths block</li>
- *     <li>Compressed keys block-size</li>
- *     <li>Compressed keys block</li>
- *     <li>Compressed value-lengths block-size</li>
- *     <li>Compressed value-lengths block</li>
- *     <li>Compressed values block-size</li>
- *     <li>Compressed values block</li>
- *   </ul>
- * </li>
- * <li>
- * A sync-marker every block.
- * </li>
- * </ul>
- * 
- * <p>The compressed blocks of key lengths and value lengths consist of the 
- * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
- * format.</p>
- * 
- * @see CompressionCodec
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public class SequenceFile {
-  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
-
-  private SequenceFile() {}                         // no public ctor
-
-  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
-  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
-  private static final byte VERSION_WITH_METADATA = (byte)6;
-  private static byte[] VERSION = new byte[] {
-    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
-  };
-
-  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
-  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
-  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
-
-  /** The number of bytes between sync points.*/
-  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
-
-  /** 
-   * The compression type used to compress key/value pairs in the 
-   * {@link SequenceFile}.
-   * 
-   * @see SequenceFile.Writer
-   */
-  public static enum CompressionType {
-    /** Do not compress records. */
-    NONE, 
-    /** Compress values only, each separately. */
-    RECORD,
-    /** Compress sequences of records together in blocks. */
-    BLOCK
-  }
-
-  /**
-   * Get the compression type for the reduce outputs
-   * @param job the job config to look in
-   * @return the kind of compression to use
-   */
-  static public CompressionType getDefaultCompressionType(Configuration job) {
-    String name = job.get("io.seqfile.compression.type");
-    return name == null ? CompressionType.RECORD : 
-      CompressionType.valueOf(name);
-  }
-  
-  /**
-   * Set the default compression type for sequence files.
-   * @param job the configuration to modify
-   * @param val the new compression type (none, block, record)
-   */
-  static public void setDefaultCompressionType(Configuration job, 
-                                               CompressionType val) {
-    job.set("io.seqfile.compression.type", val.toString());
-  }
-
-  /**
-   * Create a new Writer with the given options.
-   * @param conf the configuration to use
-   * @param opts the options to create the file with
-   * @return a new Writer
-   * @throws IOException
-   */
-  public static Writer createWriter(Configuration conf, Writer.Option... opts
-                                    ) throws IOException {
-    Writer.CompressionOption compressionOption = 
-      Options.getOption(Writer.CompressionOption.class, opts);
-    CompressionType kind;
-    if (compressionOption != null) {
-      kind = compressionOption.getValue();
-    } else {
-      kind = getDefaultCompressionType(conf);
-      opts = Options.prependOptions(opts, Writer.compression(kind));
-    }
-    switch (kind) {
-      default:
-      case NONE:
-        return new Writer(conf, opts);
-      case RECORD:
-        return new RecordCompressWriter(conf, opts);
-      case BLOCK:
-        return new BlockCompressWriter(conf, opts);
-    }
-  }
-
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem. 
-   * @param conf The configuration.
-   * @param name The name of the file. 
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer 
-    createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass) throws IOException {
-    return createWriter(conf, Writer.filesystem(fs),
-                        Writer.file(name), Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass));
-  }
-  
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem. 
-   * @param conf The configuration.
-   * @param name The name of the file. 
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer 
-    createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass, 
-                 CompressionType compressionType) throws IOException {
-    return createWriter(conf, Writer.filesystem(fs),
-                        Writer.file(name), Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass), 
-                        Writer.compression(compressionType));
-  }
-  
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem. 
-   * @param conf The configuration.
-   * @param name The name of the file. 
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @param progress The Progressable object to track progress.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer
-    createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass, CompressionType compressionType,
-                 Progressable progress) throws IOException {
-    return createWriter(conf, Writer.file(name),
-                        Writer.filesystem(fs),
-                        Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass), 
-                        Writer.compression(compressionType),
-                        Writer.progressable(progress));
-  }
-
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem. 
-   * @param conf The configuration.
-   * @param name The name of the file. 
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer 
-    createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass, CompressionType compressionType, 
-                 CompressionCodec codec) throws IOException {
-    return createWriter(conf, Writer.file(name),
-                        Writer.filesystem(fs),
-                        Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass), 
-                        Writer.compression(compressionType, codec));
-  }
-  
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem. 
-   * @param conf The configuration.
-   * @param name The name of the file. 
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @param progress The Progressable object to track progress.
-   * @param metadata The metadata of the file.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer
-    createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass, 
-                 CompressionType compressionType, CompressionCodec codec,
-                 Progressable progress, Metadata metadata) throws IOException {
-    return createWriter(conf, Writer.file(name),
-                        Writer.filesystem(fs),
-                        Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass),
-                        Writer.compression(compressionType, codec),
-                        Writer.progressable(progress),
-                        Writer.metadata(metadata));
-  }
-
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem.
-   * @param conf The configuration.
-   * @param name The name of the file.
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param bufferSize buffer size for the underlaying outputstream.
-   * @param replication replication factor for the file.
-   * @param blockSize block size for the file.
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @param progress The Progressable object to track progress.
-   * @param metadata The metadata of the file.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer
-    createWriter(FileSystem fs, Configuration conf, Path name,
-                 Class keyClass, Class valClass, int bufferSize,
-                 short replication, long blockSize,
-                 CompressionType compressionType, CompressionCodec codec,
-                 Progressable progress, Metadata metadata) throws IOException {
-    return createWriter(conf, Writer.file(name),
-                        Writer.filesystem(fs),
-                        Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass), 
-                        Writer.bufferSize(bufferSize), 
-                        Writer.replication(replication),
-                        Writer.blockSize(blockSize),
-                        Writer.compression(compressionType, codec),
-                        Writer.progressable(progress),
-                        Writer.metadata(metadata));
-  }
-
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem.
-   * @param conf The configuration.
-   * @param name The name of the file.
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param bufferSize buffer size for the underlaying outputstream.
-   * @param replication replication factor for the file.
-   * @param blockSize block size for the file.
-   * @param createParent create parent directory if non-existent
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @param metadata The metadata of the file.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   */
-  @Deprecated
-  public static Writer
-  createWriter(FileSystem fs, Configuration conf, Path name,
-               Class keyClass, Class valClass, int bufferSize,
-               short replication, long blockSize, boolean createParent,
-               CompressionType compressionType, CompressionCodec codec,
-               Metadata metadata) throws IOException {
-    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
-        conf, name, keyClass, valClass, compressionType, codec,
-        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
-        CreateOpts.bufferSize(bufferSize),
-        createParent ? CreateOpts.createParent()
-                     : CreateOpts.donotCreateParent(),
-        CreateOpts.repFac(replication),
-        CreateOpts.blockSize(blockSize)
-      );
-  }
-
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fc The context for the specified file.
-   * @param conf The configuration.
-   * @param name The name of the file.
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @param metadata The metadata of the file.
-   * @param createFlag gives the semantics of create: overwrite, append etc.
-   * @param opts file creation options; see {@link CreateOpts}.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   */
-  public static Writer
-  createWriter(FileContext fc, Configuration conf, Path name,
-               Class keyClass, Class valClass,
-               CompressionType compressionType, CompressionCodec codec,
-               Metadata metadata,
-               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
-               throws IOException {
-    return createWriter(conf, fc.create(name, createFlag, opts),
-          keyClass, valClass, compressionType, codec, metadata).ownStream();
-  }
-
-  /**
-   * Construct the preferred type of SequenceFile Writer.
-   * @param fs The configured filesystem. 
-   * @param conf The configuration.
-   * @param name The name of the file. 
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @param progress The Progressable object to track progress.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer
-    createWriter(FileSystem fs, Configuration conf, Path name, 
-                 Class keyClass, Class valClass, 
-                 CompressionType compressionType, CompressionCodec codec,
-                 Progressable progress) throws IOException {
-    return createWriter(conf, Writer.file(name),
-                        Writer.filesystem(fs),
-                        Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass),
-                        Writer.compression(compressionType, codec),
-                        Writer.progressable(progress));
-  }
-
-  /**
-   * Construct the preferred type of 'raw' SequenceFile Writer.
-   * @param conf The configuration.
-   * @param out The stream on top which the writer is to be constructed.
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @param metadata The metadata of the file.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer
-    createWriter(Configuration conf, FSDataOutputStream out, 
-                 Class keyClass, Class valClass,
-                 CompressionType compressionType,
-                 CompressionCodec codec, Metadata metadata) throws IOException {
-    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass), 
-                        Writer.compression(compressionType, codec),
-                        Writer.metadata(metadata));
-  }
-  
-  /**
-   * Construct the preferred type of 'raw' SequenceFile Writer.
-   * @param conf The configuration.
-   * @param out The stream on top which the writer is to be constructed.
-   * @param keyClass The 'key' type.
-   * @param valClass The 'value' type.
-   * @param compressionType The compression type.
-   * @param codec The compression codec.
-   * @return Returns the handle to the constructed SequenceFile Writer.
-   * @throws IOException
-   * @deprecated Use {@link #createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)}
-   *     instead.
-   */
-  @Deprecated
-  public static Writer
-    createWriter(Configuration conf, FSDataOutputStream out, 
-                 Class keyClass, Class valClass, CompressionType compressionType,
-                 CompressionCodec codec) throws IOException {
-    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
-                        Writer.valueClass(valClass),
-                        Writer.compression(compressionType, codec));
-  }
-  
-
-  /** The interface to 'raw' values of SequenceFiles. */
-  public static interface ValueBytes {
-
-    /** Writes the uncompressed bytes to the outStream.
-     * @param outStream : Stream to write uncompressed bytes into.
-     * @throws IOException
-     */
-    public void writeUncompressedBytes(DataOutputStream outStream)
-      throws IOException;
-
-    /** Write compressed bytes to outStream. 
-     * Note: that it will NOT compress the bytes if they are not compressed.
-     * @param outStream : Stream to write compressed bytes into.
-     */
-    public void writeCompressedBytes(DataOutputStream outStream) 
-      throws IllegalArgumentException, IOException;
-
-    /**
-     * Size of stored data.
-     */
-    public int getSize();
-  }
-  
-  private static class UncompressedBytes implements ValueBytes {
-    private int dataSize;
-    private byte[] data;
-    
-    private UncompressedBytes() {
-      data = null;
-      dataSize = 0;
-    }
-    
-    private void reset(DataInputStream in, int length) throws IOException {
-      if (data == null) {
-        data = new byte[length];
-      } else if (length > data.length) {
-        data = new byte[Math.max(length, data.length * 2)];
-      }
-      dataSize = -1;
-      in.readFully(data, 0, length);
-      dataSize = length;
-    }
-    
-    @Override
-    public int getSize() {
-      return dataSize;
-    }
-    
-    @Override
-    public void writeUncompressedBytes(DataOutputStream outStream)
-      throws IOException {
-      outStream.write(data, 0, dataSize);
-    }
-
-    @Override
-    public void writeCompressedBytes(DataOutputStream outStream) 
-      throws IllegalArgumentException, IOException {
-      throw 
-        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
-    }
-
-  } // UncompressedBytes
-  
-  private static class CompressedBytes implements ValueBytes {
-    private int dataSize;
-    private byte[] data;
-    DataInputBuffer rawData = null;
-    CompressionCodec codec = null;
-    CompressionInputStream decompressedStream = null;
-
-    private CompressedBytes(CompressionCodec codec) {
-      data = null;
-      dataSize = 0;
-      this.codec = codec;
-    }
-
-    private void reset(DataInputStream in, int length) throws IOException {
-      if (data == null) {
-        data = new byte[length];
-      } else if (length > data.length) {
-        data = new byte[Math.max(length, data.length * 2)];
-      } 
-      dataSize = -1;
-      in.readFully(data, 0, length);
-      dataSize = length;
-    }
-    
-    @Override
-    public int getSize() {
-      return dataSize;
-    }
-    
-    @Override
-    public void writeUncompressedBytes(DataOutputStream outStream)
-      throws IOException {
-      if (decompressedStream == null) {
-        rawData = new DataInputBuffer();
-        decompressedStream = codec.createInputStream(rawData);
-      } else {
-        decompressedStream.resetState();
-      }
-      rawData.reset(data, 0, dataSize);
-
-      byte[] buffer = new byte[8192];
-      int bytesRead = 0;
-      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
-        outStream.write(buffer, 0, bytesRead);
-      }
-    }
-
-    @Override
-    public void writeCompressedBytes(DataOutputStream outStream) 
-      throws IllegalArgumentException, IOException {
-      outStream.write(data, 0, dataSize);
-    }
-
-  } // CompressedBytes
-  
-  /**
-   * The class encapsulating with the metadata of a file.
-   * The metadata of a file is a list of attribute name/value
-   * pairs of Text type.
-   *
-   */
-  public static class Metadata implements Writable {
-
-    private TreeMap<Text, Text> theMetadata;
-    
-    public Metadata() {
-      this(new TreeMap<Text, Text>());
-    }
-    
-    public Metadata(TreeMap<Text, Text> arg) {
-      if (arg == null) {
-        this.theMetadata = new TreeMap<Text, Text>();
-      } else {
-        this.theMetadata = arg;
-      }
-    }
-    
-    public Text get(Text name) {
-      return this.theMetadata.get(name);
-    }
-    
-    public void set(Text name, Text value) {
-      this.theMetadata.put(name, value);
-    }
-    
-    public TreeMap<Text, Text> getMetadata() {
-      return new TreeMap<Text, Text>(this.theMetadata);
-    }
-    
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(this.theMetadata.size());
-      Iterator<Map.Entry<Text, Text>> iter =
-        this.theMetadata.entrySet().iterator();
-      while (iter.hasNext()) {
-        Map.Entry<Text, Text> en = iter.next();
-        en.getKey().write(out);
-        en.getValue().write(out);
-      }
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      int sz = in.readInt();
-      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
-      this.theMetadata = new TreeMap<Text, Text>();
-      for (int i = 0; i < sz; i++) {
-        Text key = new Text();
-        Text val = new Text();
-        key.readFields(in);
-        val.readFields(in);
-        this.theMetadata.put(key, val);
-      }    
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null) {
-        return false;
-      }
-      if (other.getClass() != this.getClass()) {
-        return false;
-      } else {
-        return equals((Metadata)other);
-      }
-    }
-    
-    public boolean equals(Metadata other) {
-      if (other == null) return false;
-      if (this.theMetadata.size() != other.theMetadata.size()) {
-        return false;
-      }
-      Iterator<Map.Entry<Text, Text>> iter1 =
-        this.theMetadata.entrySet().iterator();
-      Iterator<Map.Entry<Text, Text>> iter2 =
-        other.theMetadata.entrySet().iterator();
-      while (iter1.hasNext() && iter2.hasNext()) {
-        Map.Entry<Text, Text> en1 = iter1.next();
-        Map.Entry<Text, Text> en2 = iter2.next();
-        if (!en1.getKey().equals(en2.getKey())) {
-          return false;
-        }
-        if (!en1.getValue().equals(en2.getValue())) {
-          return false;
-        }
-      }
-      if (iter1.hasNext() || iter2.hasNext()) {
-        return false;
-      }
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      assert false : "hashCode not designed";
-      return 42; // any arbitrary constant will do 
-    }
-    
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("size: ").append(this.theMetadata.size()).append("\n");
-      Iterator<Map.Entry<Text, Text>> iter =
-        this.theMetadata.entrySet().iterator();
-      while (iter.hasNext()) {
-        Map.Entry<Text, Text> en = iter.next();
-        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
-        sb.append("\n");
-      }
-      return sb.toString();
-    }
-  }
-  
-  /** Write key/value pairs to a sequence-format file. */
-  public static class Writer implements java.io.Closeable, Syncable {
-    private Configuration conf;
-    FSDataOutputStream out;
-    boolean ownOutputStream = true;
-    DataOutputBuffer buffer = new DataOutputBuffer();
-
-    Class keyClass;
-    Class valClass;
-
-    private final CompressionType compress;
-    CompressionCodec codec = null;
-    CompressionOutputStream deflateFilter = null;
-    DataOutputStream deflateOut = null;
-    Metadata metadata = null;
-    Compressor compressor = null;
-    
-    protected Serializer keySerializer;
-    protected Serializer uncompressedValSerializer;
-    protected Serializer compressedValSerializer;
-    
-    // Insert a globally unique 16-byte value every few entries, so that one
-    // can seek into the middle of a file and then synchronize with record
-    // starts and ends by scanning for this value.
-    long lastSyncPos;                     // position of last sync
-    byte[] sync;                          // 16 random bytes
-    {
-      try {                                       
-        MessageDigest digester = MessageDigest.getInstance("MD5");
-        long time = Time.now();
-        digester.update((new UID()+"@"+time).getBytes());
-        sync = digester.digest();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public static interface Option {}
-    
-    static class FileOption extends Options.PathOption 
-                                    implements Option {
-      FileOption(Path path) {
-        super(path);
-      }
-    }
-
-    /**
-     * @deprecated only used for backwards-compatibility in the createWriter methods
-     * that take FileSystem.
-     */
-    @Deprecated
-    private static class FileSystemOption implements Option {
-      private final FileSystem value;
-      protected FileSystemOption(FileSystem value) {
-        this.value = value;
-      }
-      public FileSystem getValue() {
-        return value;
-      }
-    }
-
-    static class StreamOption extends Options.FSDataOutputStreamOption 
-                              implements Option {
-      StreamOption(FSDataOutputStream stream) {
-        super(stream);
-      }
-    }
-
-    static class BufferSizeOption extends Options.IntegerOption
-                                  implements Option {
-      BufferSizeOption(int value) {
-        super(value);
-      }
-    }
-    
-    static class BlockSizeOption extends Options.LongOption implements Option {
-      BlockSizeOption(long value) {
-        super(value);
-      }
-    }
-
-    static class ReplicationOption extends Options.IntegerOption
-                                   implements Option {
-      ReplicationOption(int value) {
-        super(value);
-      }
-    }
-
-    static class KeyClassOption extends Options.ClassOption implements Option {
-      KeyClassOption(Class<?> value) {
-        super(value);
-      }
-    }
-
-    static class ValueClassOption extends Options.ClassOption
-                                          implements Option {
-      ValueClassOption(Class<?> value) {
-        super(value);
-      }
-    }
-
-    static class MetadataOption implements Option {
-      private final Metadata value;
-      MetadataOption(Metadata value) {
-        this.value = value;
-      }
-      Metadata getValue() {
-        return value;
-      }
-    }
-
-    static class ProgressableOption extends Options.ProgressableOption
-                                    implements Option {
-      ProgressableOption(Progressable value) {
-        super(value);
-      }
-    }
-
-    private static class CompressionOption implements Option {
-      private final CompressionType value;
-      private final CompressionCodec codec;
-      CompressionOption(CompressionType value) {
-        this(value, null);
-      }
-      CompressionOption(CompressionType value, CompressionCodec codec) {
-        this.value = value;
-        this.codec = (CompressionType.NONE != value && null == codec)
-          ? new DefaultCodec()
-          : codec;
-      }
-      CompressionType getValue() {
-        return value;
-      }
-      CompressionCodec getCodec() {
-        return codec;
-      }
-    }
-    
-    public static Option file(Path value) {
-      return new FileOption(value);
-    }
-
-    /**
-     * @deprecated only used for backwards-compatibility in the createWriter methods
-     * that take FileSystem.
-     */
-    @Deprecated
-    private static Option filesystem(FileSystem fs) {
-      return new SequenceFile.Writer.FileSystemOption(fs);
-    }
-    
-    public static Option bufferSize(int value) {
-      return new BufferSizeOption(value);
-    }
-    
-    public static Option stream(FSDataOutputStream value) {
-      return new StreamOption(value);
-    }
-    
-    public static Option replication(short value) {
-      return new ReplicationOption(value);
-    }
-    
-    public static Option blockSize(long value) {
-      return new BlockSizeOption(value);
-    }
-    
-    public static Option progressable(Progressable value) {
-      return new ProgressableOption(value);
-    }
-
-    public static Option keyClass(Class<?> value) {
-      return new KeyClassOption(value);
-    }
-    
-    public static Option valueClass(Class<?> value) {
-      return new ValueClassOption(value);
-    }
-    
-    public static Option metadata(Metadata value) {
-      return new MetadataOption(value);
-    }
-
-    public static Option compression(CompressionType value) {
-      return new CompressionOption(value);
-    }
-
-    public static Option compression(CompressionType value,
-        CompressionCodec codec) {
-      return new CompressionOption(value, codec);
-    }
-    
-    /**
-     * Construct a uncompressed writer from a set of options.
-     * @param conf the configuration to use
-     * @param opts the options used when creating the writer
-     * @throws IOException if it fails
-     */
-    Writer(Configuration conf, 
-           Option... opts) throws IOException {
-      BlockSizeOption blockSizeOption = 
-        Options.getOption(BlockSizeOption.class, opts);
-      BufferSizeOption bufferSizeOption = 
-        Options.getOption(BufferSizeOption.class, opts);
-      ReplicationOption replicationOption = 
-        Options.getOption(ReplicationOption.class, opts);
-      ProgressableOption progressOption = 
-        Options.getOption(ProgressableOption.class, opts);
-      FileOption fileOption = Options.getOption(FileOption.class, opts);
-      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
-      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
-      KeyClassOption keyClassOption = 
-        Options.getOption(KeyClassOption.class, opts);
-      ValueClassOption valueClassOption = 
-        Options.getOption(ValueClassOption.class, opts);
-      MetadataOption metadataOption = 
-        Options.getOption(MetadataOption.class, opts);
-      CompressionOption compressionTypeOption =
-        Options.getOption(CompressionOption.class, opts);
-      // check consistency of options
-      if ((fileOption == null) == (streamOption == null)) {
-        throw new IllegalArgumentException("file or stream must be specified");
-      }
-      if (fileOption == null && (blockSizeOption != null ||
-                                 bufferSizeOption != null ||
-                                 replicationOption != null ||
-                                 progressOption != null)) {
-        throw new IllegalArgumentException("file modifier options not " +
-                                           "compatible with stream");
-      }
-
-      FSDataOutputStream out;
-      boolean ownStream = fileOption != null;
-      if (ownStream) {
-        Path p = fileOption.getValue();
-        FileSystem fs;
-        if (fsOption != null) {
-          fs = fsOption.getValue();
-        } else {
-          fs = p.getFileSystem(conf);
-        }
-        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
-          bufferSizeOption.getValue();
-        short replication = replicationOption == null ? 
-          fs.getDefaultReplication(p) :
-          (short) replicationOption.getValue();
-        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
-          blockSizeOption.getValue();
-        Progressable progress = progressOption == null ? null :
-          progressOption.getValue();
-        out = fs.create(p, true, bufferSize, replication, blockSize, progress);
-      } else {
-        out = streamOption.getValue();
-      }
-      Class<?> keyClass = keyClassOption == null ?
-          Object.class : keyClassOption.getValue();
-      Class<?> valueClass = valueClassOption == null ?
-          Object.class : valueClassOption.getValue();
-      Metadata metadata = metadataOption == null ?
-          new Metadata() : metadataOption.getValue();
-      this.compress = compressionTypeOption.getValue();
-      final CompressionCodec codec = compressionTypeOption.getCodec();
-      if (codec != null &&
-          (codec instanceof GzipCodec) &&
-          !NativeCodeLoader.isNativeCodeLoaded() &&
-          !ZlibFactory.isNativeZlibLoaded(conf)) {
-        throw new IllegalArgumentException("SequenceFile doesn't work with " +
-                                           "GzipCodec without native-hadoop " +
-                                           "code!");
-      }
-      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
-    }
-
-    /** Create the named file.
-     * @deprecated Use 
-     *   {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} 
-     *   instead.
-     */
-    @Deprecated
-    public Writer(FileSystem fs, Configuration conf, Path name, 
-                  Class keyClass, Class valClass) throws IOException {
-      this.compress = CompressionType.NONE;
-      init(conf, fs.create(name), true, keyClass, valClass, null, 
-           new Metadata());
-    }
-    
-    /** Create the named file with write-progress reporter.
-     * @deprecated Use 
-     *   {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} 
-     *   instead.
-     */
-    @Deprecated
-    public Writer(FileSystem fs, Configuration conf, Path name, 
-                  Class keyClass, Class valClass,
-                  Progressable progress, Metadata metadata) throws IOException {
-      this.compress = CompressionType.NONE;
-      init(conf, fs.create(name, progress), true, keyClass, valClass,
-           null, metadata);
-    }
-    
-    /** Create the named file with write-progress reporter. 
-     * @deprecated Use 
-     *   {@link SequenceFile#createWriter(Configuration, com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option...)} 
-     *   instead.
-     */
-    @Deprecated
-    public Writer(FileSystem fs, Configuration conf, Path name,
-                  Class keyClass, Class valClass,
-                  int bufferSize, short replication, long blockSize,
-                  Progressable progress, Metadata metadata) throws IOException {
-      this.compress = CompressionType.NONE;
-      init(conf,
-           fs.create(name, true, bufferSize, replication, blockSize, progress),
-           true, keyClass, valClass, null, metadata);
-    }
-
-    boolean isCompressed() { return compress != CompressionType.NONE; }
-    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
-    
-    Writer ownStream() { this.ownOutputStream = true; return this;  }
-
-    /** Write and flush the file header. */
-    private void writeFileHeader() 
-      throws IOException {
-      out.write(VERSION);
-      Text.writeString(out, keyClass.getName());
-      Text.writeString(out, valClass.getName());
-      
-      out.writeBoolean(this.isCompressed());
-      out.writeBoolean(this.isBlockCompressed());
-      
-      if (this.isCompressed()) {
-        Text.writeString(out, (codec.getClass()).getName());
-      }
-      this.metadata.write(out);
-      out.write(sync);                       // write the sync bytes
-      out.flush();                           // flush header
-    }
-    
-    /** Initialize. */
-    @SuppressWarnings("unchecked")
-    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
-              Class keyClass, Class valClass,
-              CompressionCodec codec, Metadata metadata) 
-      throws IOException {
-      this.conf = conf;
-      this.out = out;
-      this.ownOutputStream = ownStream;
-      this.keyClass = keyClass;
-      this.valClass = valClass;
-      this.codec = codec;
-      this.metadata = metadata;
-      SerializationFactory serializationFactory = new SerializationFactory(conf);
-      this.keySerializer = serializationFactory.getSerializer(keyClass);
-      if (this.keySerializer == null) {
-        throw new IOException(
-            "Could not find a serializer for the Key class: '"
-                + keyClass.getCanonicalName() + "'. "
-                + "Please ensure that the configuration '" +
-                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
-                + "properly configured, if you're using"
-                + "custom serialization.");
-      }
-      this.keySerializer.open(buffer);
-      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
-      if (this.uncompressedValSerializer == null) {
-        throw new IOException(
-            "Could not find a serializer for the Value class: '"
-                + valClass.getCanonicalName() + "'. "
-                + "Please ensure that the configuration '" +
-                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
-                + "properly configured, if you're using"
-                + "custom serialization.");
-      }
-      this.uncompressedValSerializer.open(buffer);
-      if (this.codec != null) {
-        ReflectionUtils.setConf(this.codec, this.conf);
-        this.compressor = CodecPool.getCompressor(this.codec);
-        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
-        this.deflateOut = 
-          new DataOutputStream(new BufferedOutputStream(deflateFilter));
-        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
-        if (this.compressedValSerializer == null) {
-          throw new IOException(
-              "Could not find a serializer for the Value class: '"
-                  + valClass.getCanonicalName() + "'. "
-                  + "Please ensure that the configuration '" +
-                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
-                  + "properly configured, if you're using"
-                  + "custom serialization.");
-        }
-        this.compressedValSerializer.open(deflateOut);
-      }
-      writeFileHeader();
-    }
-    
-    /** Returns the class of keys in this file. */
-    public Class getKeyClass() { return keyClass; }
-
-    /** Returns the class of values in this file. */
-    public Class getValueClass() { return valClass; }
-
-    /** Returns the compression codec of data in this file. */
-    public CompressionCodec getCompressionCodec() { return codec; }
-    
-    /** create a sync point */
-    public void sync() throws IOException {
-      if (sync != null && lastSyncPos != out.getPos()) {
-        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
-        out.write(sync);                          // write sync
-        lastSyncPos = out.getPos();               // update lastSyncPos
-      }
-    }
-
-    /**
-     * flush all currently written data to the file system
-     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
-     */
-    @Deprecated
-    public void syncFs() throws IOException {
-      if (out != null) {
-        out.sync();                               // flush contents to file system
-      }
-    }
-
-    @Override
-    public void hsync() throws IOException {
-      if (out != null) {
-        out.hsync();
-      }
-    }
-    // Pivotal changes begin
-    public void hsyncWithSizeUpdate() throws IOException {
-      if (out != null) {
-        if (out instanceof HdfsDataOutputStream) {
-          try {
-            ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-          } catch (NoSuchMethodError e){
-            // We are probably working with an older version of hadoop jars which does not have the 
-            // hsync function with SyncFlag. Use the hsync version that does not update the size. 
-            out.hsync();
-          }
-        }
-        else {
-            out.hsync();
-        }
-      }
-    }
-    // Pivotal changes end
-    @Override
-    public void hflush() throws IOException {
-      if (out != null) {
-        out.hflush();
-      }
-    }
-    
-    /** Returns the configuration of this file. */
-    Configuration getConf() { return conf; }
-    
-    /** Close the file. */
-    @Override
-    public synchronized void close() throws IOException {
-      keySerializer.close();
-      uncompressedValSerializer.close();
-      if (compressedValSerializer != null) {
-        compressedValSerializer.close();
-      }
-
-      CodecPool.returnCompressor(compressor);
-      compressor = null;
-      
-      if (out != null) {
-        
-        // Close the underlying stream iff we own it...
-        if (ownOutputStream) {
-          out.close();
-        } else {
-          out.flush();
-        }
-        out = null;
-      }
-    }
-
-    synchronized void checkAndWriteSync() throws IOException {
-      if (sync != null &&
-          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
-        sync();
-      }
-    }
-
-    /** Append a key/value pair. */
-    public void append(Writable key, Writable val)
-      throws IOException {
-      append((Object) key, (Object) val);
-    }
-
-    /** Append a key/value pair. */
-    @SuppressWarnings("unchecked")
-    public synchronized void append(Object key, Object val)
-      throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+key.getClass().getName()
-                              +" is not "+keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: "+val.getClass().getName()
-                              +" is not "+valClass);
-
-      buffer.reset();
-
-      // Append the 'key'
-      keySerializer.serialize(key);
-      int keyLength = buffer.getLength();
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + key);
-
-      // Append the 'value'
-      if (compress == CompressionType.RECORD) {
-        deflateFilter.resetState();
-        compressedValSerializer.serialize(val);
-        deflateOut.flush();
-        deflateFilter.finish();
-      } else {
-        uncompressedValSerializer.serialize(val);
-      }
-
-      // Write the record out
-      checkAndWriteSync();                                // sync
-      out.writeInt(buffer.getLength());                   // total record length
-      out.writeInt(keyLength);                            // key portion length
-      out.write(buffer.getData(), 0, buffer.getLength()); // data
-    }
-
-    public synchronized void appendRaw(byte[] keyData, int keyOffset,
-        int keyLength, ValueBytes val) throws IOException {
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + keyLength);
-
-      int valLength = val.getSize();
-
-      checkAndWriteSync();
-      
-      out.writeInt(keyLength+valLength);          // total record length
-      out.writeInt(keyLength);                    // key portion length
-      out.write(keyData, keyOffset, keyLength);   // key
-      val.writeUncompressedBytes(out);            // value
-    }
-
-    /** Returns the current length of the output file.
-     *
-     * <p>This always returns a synchronized position.  In other words,
-     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
-     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
-     * the key may be earlier in the file than key last written when this
-     * method was called (e.g., with block-compression, it may be the first key
-     * in the block that was being written when this method was called).
-     */
-    public synchronized long getLength() throws IOException {
-      return out.getPos();
-    }
-
-  } // class Writer
-
-  /** Write key/compressed-value pairs to a sequence-format file. */
-  static class RecordCompressWriter extends Writer {
-    
-    RecordCompressWriter(Configuration conf, 
-                         Option... options) throws IOException {
-      super(conf, options);
-    }
-
-    /** Append a key/value pair. */
-    @Override
-    @SuppressWarnings("unchecked")
-    public synchronized void append(Object key, Object val)
-      throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+key.getClass().getName()
-                              +" is not "+keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: "+val.getClass().getName()
-                              +" is not "+valClass);
-
-      buffer.reset();
-
-      // Append the 'key'
-      keySerializer.serialize(key);
-      int keyLength = buffer.getLength();
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + key);
-
-      // Compress 'value' and append it
-      deflateFilter.resetState();
-      compressedValSerializer.serialize(val);
-      deflateOut.flush();
-      deflateFilter.finish();
-
-      // Write the record out
-      checkAndWriteSync();                                // sync
-      out.writeInt(buffer.getLength());                   // total record length
-      out.writeInt(keyLength);                            // key portion length
-      out.write(buffer.getData(), 0, buffer.getLength()); // data
-    }
-
-    /** Append a key/value pair. */
-    @Override
-    public synchronized void appendRaw(byte[] keyData, int keyOffset,
-        int keyLength, ValueBytes val) throws IOException {
-
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + keyLength);
-
-      int valLength = val.getSize();
-      
-      checkAndWriteSync();                        // sync
-      out.writeInt(keyLength+valLength);          // total record length
-      out.writeInt(keyLength);                    // key portion length
-      out.write(keyData, keyOffset, keyLength);   // 'key' data
-      val.writeCompressedBytes(out);              // 'value' data
-    }
-    
-  } // RecordCompressionWriter
-
-  /** Write compressed key/value blocks to a sequence-format file. */
-  static class BlockCompressWriter extends Writer {
-    
-    private int noBufferedRecords = 0;
-    
-    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
-    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
-
-    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
-    private DataOutputBuffer valBuffer = new DataOutputBuffer();
-
-    private final int compressionBlockSize;
-    
-    BlockCompressWriter(Configuration conf,
-                        Option... options) throws IOException {
-      super(conf, options);
-      compressionBlockSize = 
-        conf.getInt("io.seqfile.compress.blocksize", 1000000);
-      keySerializer.close();
-      keySerializer.open(keyBuffer);
-      uncompressedValSerializer.close();
-      uncompressedValSerializer.open(valBuffer);
-    }
-
-    /** Workhorse to check and write out compressed data/lengths */
-    private synchronized 
-      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
-      throws IOException {
-      deflateFilter.resetState();
-      buffer.reset();
-      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
-                       uncompressedDataBuffer.getLength());
-      deflateOut.flush();
-      deflateFilter.finish();
-      
-      WritableUtils.writeVInt(out, buffer.getLength());
-      out.write(buffer.getData(), 0, buffer.getLength());
-    }
-    
-    /** Compress and flush contents to dfs */
-    @Override
-    public synchronized void sync() throws IOException {
-      if (noBufferedRecords > 0) {
-        super.sync();
-        
-        // No. of records
-        WritableUtils.writeVInt(out, noBufferedRecords);
-        
-        // Write 'keys' and lengths
-        writeBuffer(keyLenBuffer);
-        writeBuffer(keyBuffer);
-        
-        // Write 'values' and lengths
-        writeBuffer(valLenBuffer);
-        writeBuffer(valBuffer);
-        
-        // Flush the file-stream
-        out.flush();
-        
-        // Reset internal states
-        keyLenBuffer.reset();
-        keyBuffer.reset();
-        valLenBuffer.reset();
-        valBuffer.reset();
-        noBufferedRecords = 0;
-      }
-      
-    }
-    
-    /** Close the file. */
-    @Override
-    public synchronized void close() throws IOException {
-      if (out != null) {
-        sync();
-      }
-      super.close();
-    }
-
-    /** Append a key/value pair. */
-    @Override
-    @SuppressWarnings("unchecked")
-    public synchronized void append(Object key, Object val)
-      throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+key+" is not "+keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: "+val+" is not "+valClass);
-
-      // Save key/value into respective buffers 
-      int oldKeyLength = keyBuffer.getLength();
-      keySerializer.serialize(key);
-      int keyLength = keyBuffer.getLength() - oldKeyLength;
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + key);
-      WritableUtils.writeVInt(keyLenBuffer, keyLength);
-
-      int oldValLength = valBuffer.getLength();
-      uncompressedValSerializer.serialize(val);
-      int valLength = valBuffer.getLength() - oldValLength;
-      WritableUtils.writeVInt(valLenBuffer, valLength);
-      
-      // Added another key/value pair
-      ++noBufferedRecords;
-      
-      // Compress and flush?
-      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
-      if (currentBlockSize >= compressionBlockSize) {
-        sync();
-      }
-    }
-    
-    /** Append a key/value pair. */
-    @Override
-    public synchronized void appendRaw(byte[] keyData, int keyOffset,
-        int keyLength, ValueBytes val) throws IOException {
-      
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed");
-
-      int valLength = val.getSize();
-      
-      // Save key/value data in relevant buffers
-      WritableUtils.writeVInt(keyLenBuffer, keyLength);
-      keyBuffer.write(keyData, keyOffset, keyLength);
-      WritableUtils.writeVInt(valLenBuffer, valLength);
-      val.writeUncompressedBytes(valBuffer);
-
-      // Added another key/value pair
-      ++noBufferedRecords;
-
-      // Compress and flush?
-      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
-      if (currentBlockSize >= compressionBlockSize) {
-        sync();
-      }
-    }
-  
-  } // BlockCompressionWriter
-
-  /** Get the configured buffer size */
-  private static int getBufferSize(Configuration conf) {
-    return conf.getInt("io.file.buffer.size", 4096);
-  }
-
-  /** Reads key/value pairs from a sequence-format file. */
-  public static class Reader implements java.io.Closeable {
-    private String filename;
-    private FSDataInputStream in;
-    private DataOutputBuffer outBuf = new DataOutputBuffer();
-
-    private byte version;
-
-    private String keyClassName;
-    private String valClassName;
-    private Class keyClass;
-    private Class valClass;
-
-    private CompressionCodec codec = null;
-    private Metadata metadata = null;
-    
-    private byte[] sync = new byte[SYNC_HASH_SIZE];
-    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
-    private boolean syncSeen;
-
-    private long headerEnd;
-    private long end;
-    private int keyLength;
-    private int recordLength;
-
-    private boolean decompress;
-    private boolean blockCompressed;
-    
-    private Configuration conf;
-
-    private int noBufferedRecords = 0;
-    private boolean lazyDecompress = true;
-    private boolean valuesDecompressed = true;
-    
-    private int noBufferedKeys = 0;
-    private int noBufferedValues = 0;
-    
-    private DataInputBuffer keyLenBuffer = null;
-    private CompressionInputStream keyLenInFilter = null;
-    private DataInputStream keyLenIn = null;
-    private Decompressor keyLenDecompressor = null;
-    private DataInputBuffer keyBuffer = null;
-    private CompressionInputStream keyInFilter = null;
-    private DataInputStream keyIn = null;
-    private Decompressor keyDecompressor = null;
-
-    private DataInputBuffer valLenBuffer = null;
-    private CompressionInputStream valLenInFilter = null;
-    private DataInputStream valLenIn = null;
-    private Decompressor valLenDecompressor = null;
-    private DataInputBuffer valBuffer = null;
-    private CompressionInputStream valInFilter = null;
-    private DataInputStream valIn = null;
-    private Decompressor valDecompressor = null;
-    
-    private Deserializer keyDeserializer;
-    private Deserializer valDeserializer;
-
-    /**
-     * A tag interface for all of the Reader options
-     */
-    public static interface Option {}
-    
-    /**
-     * Create an option to specify the path name of the sequence file.
-     * @param value the path to read
-     * @return a new option
-     */
-    public static Option file(Path value) {
-      return new FileOption(value);
-    }
-    
-    /**
-     * Create an option to specify the stream with the sequence file.
-     * @param value the stream to read.
-     * @return a new option
-     */
-    public static Option stream(FSDataInputStream value) {
-      return new InputStreamOption(value);
-    }
-    
-    /**
-     * Create an option to specify the starting byte to read.
-     * @param value the number of bytes to skip over
-     * @return a new option
-     */
-    public static Option start(long value) {
-      return new StartOption(value);
-    }
-    
-    /**
-     * Create an option to specify the number of bytes to read.
-     * @param value the number of bytes to read
-     * @return a new option
-     */
-    public static Option length(long value) {
-      return new LengthOption(value);
-    }
-    
-    /**
-     * Create an option with the buffer size for reading the given pathname.
-     * @param value the number of bytes to buffer
-     * @return a new option
-     */
-    public static Option bufferSize(int value) {
-      return new BufferSizeOption(value);
-    }
-
-    private static class FileOption extends Options.PathOption 
-                                    implements Option {
-      private FileOption(Path value) {
-        super(value);
-      }
-    }
-    
-    private static class InputStreamOption
-        extends Options.FSDataInputStreamOption 
-        implements Option {
-      private InputStreamOption(FSDataInputStream value) {
-        super(value);
-      }
-    }
-
-    private static class StartOption extends Options.LongOption
-                                     implements Option {
-      private StartOption(long value) {
-        super(value);
-      }
-    }
-
-    private static class LengthOption extends Options.LongOption
-                                      implements Option {
-      private LengthOption(long value) {
-        super(value);
-      }
-    }
-
-    private static class BufferSizeOption extends Options.IntegerOption
-                                      implements Option {
-      private BufferSizeOption(int value) {
-        super(value);
-      }
-    }
-
-    // only used directly
-    private static class OnlyHeaderOption extends Options.BooleanOption 
-                                          implements Option {
-      private OnlyHeaderOption() {
-        super(true);
-      }
-    }
-
-    public Reader(Configuration conf, Option... opts) throws IOException {
-      // Look up the options, these are null if not set
-      FileOption fileOpt = Options.getOption(FileOption.class, opts);
-      InputStreamOption streamOpt = 
-        Options.getOption(InputStreamOption.class, opts);
-      StartOption startOpt = Options.getOption(StartOption.class, opts);
-      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
-      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
-      OnlyHeaderOption headerOnly = 
-        Options.getOption(OnlyHeaderOption.class, opts);
-      // check for consistency
-      if ((fileOpt == null) == (streamOpt == null)) {
-        throw new 
-          IllegalArgumentException("File or stream option must be specified");
-      }
-      if (fileOpt == null && bufOpt != null) {
-        throw new IllegalArgumentException("buffer size can only be set when" +
-                                           " a file is specified.");
-      }
-      // figure out the real values
-      Path filename = null;
-      FSDataInputStream file;
-      final long len;
-      if (fileOpt != null) {
-        filename = fileOpt.getValue();
-        FileSystem fs = filename.getFileSystem(conf);
-        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
-        len = null == lenOpt
-          ? fs.getFileStatus(filename).getLen()
-          : lenOpt.getValue();
-        file = openFile(fs, filename, bufSize, len);
-      } else {
-        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
-        file = streamOpt.getValue();
-      }
-      long start = startOpt == null ? 0 : startOpt.getValue();
-      // really set up
-      initialize(filename, file, start, len, conf, headerOnly != null);
-    }
-
-    /**
-     * Construct a reader by opening a file from the given file system.
-     * @param fs The file system used to open the file.
-     * @param file The file being read.
-     * @param conf Configuration
-     * @throws IOException
-     * @deprecated Use Reader(Configuration, Option...) instead.
-     */
-    @Deprecated
-    public Reader(FileSystem fs, Path file, 
-                  Configuration conf) throws IOException {
-      this(conf, file(file.makeQualified(fs)));
-    }
-
-    /**
-     * Construct a reader by the given input stream.
-     * @param in An input stream.
-     * @param buffersize unused
-     * @param start The starting position.
-     * @param length The length being read.
-     * @param conf Configuration
-     * @throws IOException
-     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
-     */
-    @Deprecated
-    public Reader(FSDataInputStream in, int buffersize,
-        long start, long length, Configuration conf) throws IOException {
-      this(conf, stream(in), start(start), length(length));
-    }
-
-    /** Common work of the constructors. */
-    private void initialize(Path filename, FSDataInputStream in,
-                            long start, long length, Configuration conf,
-                            boolean tempReader) throws IOException {
-      if (in == null) {
-        throw new IllegalArgumentException("in == null");
-      }
-      this.filename = filename == null ? "<unknown>" : filename.toString();
-      this.in = in;
-      this.conf = conf;
-      boolean succeeded = false;
-      try {
-        seek(start);
-        this.end = this.in.getPos() + length;
-        // if it wrapped around, use the max
-        if (end < length) {
-          end = Long.MAX_VALUE;
-        }
-        init(tempReader);
-        succeeded = true;
-      } finally {
-        if (!succeeded) {
-          IOUtils.cleanup(LOG, this.in);
-        }
-      }
-    }
-
-    /**
-     * Override this method to specialize the type of
-     * {@link FSDataInputStream} returned.
-     * @param fs The file system used to open the file.
-     * @param file The file being read.
-     * @param bufferSize The buffer size used to read the file.
-     * @param length The length being read if it is >= 0.  Otherwise,
-     *               the length is not available.
-     * @return The opened stream.
-     * @throws IOException
-     */
-    protected FSDataInputStream openFile(FileSystem fs, Path file,
-        int bufferSize, long length) throws IOException {
-      return fs.open(file, bufferSize);
-    }
-    
-    /**
-     * Initialize the {@link Reader}
-     * @param tempReader <code>true</code> if we are constructing a temporary
-     *                  reader {@link SequenceFile.Sorter#cloneFileAttributes}, 
-     *                  and hence do not initialize every component; 
-     *                  <code>false</code> otherwise.
-     * @throws IOException
-     */
-    private void init(boolean tempReader) throws IOException {
-      byte[] versionBlock = new byte[VERSION.length];
-      in.readFully(versionBlock);
-
-      if ((versionBlock[0] != VERSION[0]) ||
-          (versionBlock[1] != VERSION[1]) ||
-          (versionBlock[2] != VERSION[2]))
-        throw new IOException(this + " not a SequenceFile");
-
-      // Set 'version'
-      version = versionBlock[3];
-      if (version > VERSION[3])
-        throw new VersionMismatchException(VERSION[3], version);
-
-      if (version < BLOCK_COMPRESS_VERSION) {
-        UTF8 className = new UTF8();
-
-        className.readFields(in);
-        keyClassName = className.toString(); // key class name
-
-        className.readFields(in);
-        valClassName = className.toString(); // val class name
-      } else {
-        keyClassName = Text.readString(in);
-        valClassName = Text.readString(in);
-      }
-
-      if (version > 2) {                          // if version > 2
-        this.decompress = in.readBoolean();       // is compressed?
-      } else {
-        decompress = false;
-      }
-
-      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
-        this.blockCompressed = in.readBoolean();  // is block-compressed?
-      } else {
-        blockCompressed = false;
-      }
-      
-      // if version >= 5
-      // setup the compression codec
-      if (decompress) {
-        if (version >= CUSTOM_COMPRESS_VERSION) {
-          String codecClassname = Text.readString(in);
-          try {
-            Class<? extends CompressionCodec> codecClass
-              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
-            this.codec = ReflectionUtils.newInstance(codecClass, conf);
-          } catch (ClassNotFoundException cnfe) {
-            throw new IllegalArgumentException("Unknown codec: " + 
-                                               codecClassname, cnfe);
-          }
-        } else {
-          codec = new DefaultCodec();
-          ((Configurable)codec).setConf(conf);
-        }
-      }
-      
-      this.metadata = new Metadata();
-      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
-        this.metadata.readFields(in);
-      }
-      
-      if (version > 1) {                          // if version > 1
-        in.readFully(sync);                       // read sync bytes
-        headerEnd = in.getPos();                  // record end of header
-      }
-      
-      // Initialize... *not* if this we are constructing a temporary Reader
-      if (!tempReader) {
-        valBuffer = new DataInputBuffer();
-        if (decompress) {
-          valDecompressor = CodecPool.getDecompressor(codec);
-          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
-          valIn = new DataInputStream(valInFilter);
-        } else {
-          valIn = valBuffer;
-        }
-
-        if (blockCompressed) {
-          keyLenBuffer = new DataInputBuffer();
-          keyBuffer = new DataInputBuffer();
-          valLenBuffer = new DataInputBuffer();
-
-          keyLenDecompressor = CodecPool.getDecompressor(codec);
-          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
-                                                   keyLenDecompressor);
-          keyLenIn = new DataInputStream(keyLenInFilter);
-
-          keyDecompressor = CodecPool.getDecompressor(codec);
-          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
-          keyIn = new DataInputStream(keyInFilter);
-
-          valLenDecompressor = CodecPool.getDecompressor(codec);
-          valLenInFilter = codec.createInputStream(valLenBuffer, 
-                                                   valLenDecompressor);
-          valLenIn = new DataInputStream(valLenInFilter);
-        }
-        
-        SerializationFactory serializationFactory =
-          new SerializationFactory(conf);
-        this.keyDeserializer =
-          getDeserializer(serializationFactory, getKeyClass());
-        if (this.keyDeserializer == null) {
-          throw new IOException(
-              "Could not find a deserializer for the Key class: '"
-                  + getKeyClass().getCanonicalName() + "'. "
-                  + "Please ensure that the configuration '" +
-                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
-                  + "properly configured, if you're using "
-                  + "custom serialization.");
-        }
-        if (!blockCompressed) {
-          this.keyDeserializer.open(valBuffer);
-        } else {
-          this.keyDeserializer.open(keyIn);
-        }
-        this.valDeserializer =
-          getDeserializer(serializationFactory, getValueClass());
-        if (this.valDeserializer == null) {
-          throw new IOException(
-              "Could not find a deserializer for the Value class: '"
-                  + getValueClass().getCanonicalName() + "'. "
-                  + "Please ensure that the configuration '" +
-                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
-                  + "properly configured, if you're using "
-                  + "custom serialization.");
-        }
-        this.valDeserializer.open(valIn);
-      }
-    }
-    
-    @SuppressWarnings("unchecked")
-    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
-      return sf.getDeserializer(c);
-    }
-    
-    /** Close the file. */
-    @Override
-    public synchronized void close() throws IOException {
-      // Return the decompressors to the pool
-      CodecPool.returnDecompressor(keyLenDecompressor);
-      CodecPool.returnDecompressor(keyDecompressor);
-      CodecPool.returnDecompressor(valLenDecompressor);
-      CodecPool.returnDecompressor(valDecompressor);
-      keyLenDecompressor = keyDecompressor = null;
-      valLenDecompressor = valDecompressor = null;
-      
-      if (keyDeserializer != null) {
-    	keyDeserializer.close();
-      }
-      if (valDeserializer != null) {
-        valDeserializer.close();
-      }
-      
-      // Close the input-stream
-      in.close();
-    }
-
-    /** Returns the name of the key class. */
-    public String getKeyClassName() {
-      return keyClassName;
-    }
-
-    /** Returns the class of keys in this file. */
-    public synchronized Class<?> getKeyClass() {
-      if (null == keyClass) {
-        try {
-          keyClass = WritableName.getClass(getKeyClassName(), conf);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      return keyClass;
-    }
-
-    /** Returns the name of the value class. */
-    public String getValueClassName() {
-      return valClassName;
-    }
-
-    /** Returns the class of values in this file. */
-    public synchronized Class<?> getValueClass() {
-      if (null == valClass) {
-        try {
-          valClass = WritableName.getClass(getValueClassName(), conf);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      return valClass;
-    }
-
-    /** Returns true if values are compressed. */
-    public boolean isCompressed() { return decompress; }
-    
-    /** Returns true if records are block-compressed. */
-    public boolean isBlockCompressed() { return blockCompressed; }
-    
-    /** Returns the compression codec of data in this file. */
-    public CompressionCodec getCompressionCodec() { return codec; }
-    
-    /**
-     * Get the compression type for this file.
-     * @return the compression type
-     */
-    public CompressionType getCompressionType() {
-      if (decompress) {
-        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
-      } else {
-        return CompressionType.NONE;
-      }
-    }
-
-    /** Returns the metadata object of the file */
-    public Metadata getMetadata() {
-      return this.metadata;
-    }
-    
-    /** Returns the configuration used for this file. */
-    Configuration getConf() { return conf; }
-    
-    /** Read a compressed buffer */
-    private synchronized void readBuffer(DataInputBuffer buffer, 
-                                         CompressionInputStream filter) throws IOException {
-      // Read data into a temporary buffer
-      DataOutputBuffer dataBuffer = new DataOutputBuffer();
-
-      try {
-        int dataBufferLength = WritableUtils.readVInt(in);
-        dataBuffer.write(in, dataBufferLength);
-      
-        // Set up 'buffer' connected to the input-stream
-        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
-      } finally {
-        dataBuffer.close();
-      }
-
-      // Reset the codec
-      filter.resetState();
-    }
-    
-    /** Read the next 'compressed' block */
-    private synchronized void readBlock() throws IOException {
-      // Check if we need to throw away a whole block of 
-      // 'values' due to 'lazy decompression' 
-      if (lazyDecompress && !valuesDecompressed) {
-        in.seek(WritableUtils.readVInt(in)+in.getPos());
-        in.seek(WritableUtils.readVInt(in)+in.getPos());
-      }
-      
-      // Reset internal states
-      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
-      valuesDecompressed = false;
-
-      //Process sync
-      if (sync != null) {
-        in.readInt();
-        in.readFully(syncCheck);                // read syncCheck
-        if (!Arrays.equals(sync, syncCheck))    // check it
-          throw new IOException("File is corrupt!");
-      }
-      syncSeen = true;
-
-      // Read number of records in this block
-      noBufferedRecords = WritableUtils.readVInt(in);
-      
-      // Read key lengths and keys
-      readBuffer(keyLenBuffer, keyLenInFilter);
-      readBuffer(keyBuffer, keyInFilter);
-      noBufferedKeys = noBufferedRecords;
-      
-      // Read value lengths and values
-      if (!lazyDecompress) {
-        readBuffer(valLenBuffer, valLenInFilter);
-        readBuffer(valBuffer, valInFilter);
-        noBufferedValues = noBufferedRecords;
-        valuesDecompressed = true;
-      }
-    }
-
-    /** 
-     * Position valLenIn/valIn to the 'value' 
-     * corresponding to the 'current' key 
-     */
-    private synchronized void seekToCurrentValue() throws IOException {
-      if (!blockCompressed) {
-        if (decompress) {
-          valInFilter.resetState();
-        }
-        valBuffer.reset();
-      } else {
-        // Check if this is the first value in the 'block' to be read
-        if (lazyDecompress && !valuesDecompressed) {
-          // Read the value lengths and values
-          readBuffer(valLenBuffer, valLenInFilter);
-          readBuffer(valBuffer, valInFilter);
-          noBufferedValues = noBufferedRecords;
-          valuesDecompressed = true;
-        }
-        
-        // Calculate the no. of bytes to skip
-        // Note: 'current' key has already been read!
-        int skipValBytes = 0;
-        int currentKey = noBufferedKeys + 1;          
-        for (int i=noBufferedValues; i > currentKey; --i) {
-          skipValBytes += WritableUtils.readVInt(valLenIn);
-          --noBufferedValues;
-        }
-        
-        // Skip to the 'val' corresponding to 'current' key
-        if (skipValBytes > 0) {
-          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
-            throw new IOException("Failed to seek to " + currentKey + 
-                                  "(th) value!");
-          }
-        }
-      }
-    }
-
-    /**
-     * Get the 'value' corresponding to the last read 'key'.
-     * @param val : The 'value' to be read.
-     * @throws IOException
-     */
-    public synchronized void getCurrentValue(Writable val) 
-      throws IOException {
-      if (val instanceof Configurable) {
-        ((Configurable) val).setConf(this.conf);
-      }
-
-      // Position stream to 'current' value
-      seekToCurrentValue();
-
-      if (!blockCompressed) {
-        val.readFields(valIn);
-        
-        if (valIn.read() > 0) {
-          LOG.info("available bytes: " + valIn.available());
-          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
-                                + " bytes, should read " +
-                                (valBuffer.getLength()-keyLength));
-        }
-      } else {
-        // Get the value
-        int valLength = WritableUtils.readVInt(valLenIn);
-        val.readFields(valIn);
-        
-        // Read another compressed 'value'
-        --noBufferedValues;
-        
-        // Sanity check
-        if ((valLength < 0) && LOG.isDebugEnabled()) {
-          LOG.debug(val + " is a zero-length value");
-        }
-      }
-
-    }
-    
-    /**
-     * Get the 'value' corresponding to the last read 'key'.
-     * @param val : The 'value' to be read.
-     * @throws IOException
-     */
-    public synchronized Object getCurrentValue(Object val) 
-      throws IOException {
-      if (val instanceof Configurable) {
-        ((Configurable) val).setConf(this.conf);
-      }
-
-      // Position stream to 'current' value
-      seekToCurrentValue();
-
-      if (!blockCompressed) {
-        val = deserializeValue(val);
-        
-        if (valIn.read() > 0) {
-          LOG.info("available bytes: " + valIn.available());
-          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
-                                + " bytes, should read " +
-                                (valBuffer.getLength()-keyLength));
-        }
-      } else {
-        // Get the value
-        int valLength = WritableUtils.readVInt(valLenIn);
-        val = deserializeValue(val);
-        
-        // Read another compressed 'value'
-        --noBufferedValues;
-        
-        // Sanity check
-        if ((valLength < 0) && LOG.isDebugEnabled()) {
-          LOG.debug(val + " is a zero-length value");
-        }
-      }
-      return val;
-
-    }
-
-    @SuppressWarnings("unchecked")
-    private Object deserializeValue(Object val) throws IOException {
-      return valDeserializer.deserialize(val);
-    }
-    
-    /** Read the next key in the file into <code>key</code>, skipping its
-     * value.  True if another entry exists, and false at end of file. */
-    public synchronized boolean next(Writable key) throws IOException {
-      if (key.getClass() != getKeyClass())
-        throw new IOException("wrong key class: "+key.getClass().getName()
-                              +" is not "+keyClass);
-
-      if (!blockCompressed) {
-        outBuf.reset();
-        
-        keyLength = next(outBuf);
-        if (keyLength < 0)
-          return false;
-        
-        valBuffer.reset(outBuf.getData(), outBuf.getLength());
-        
-        key.readFields(valBuffer);
-        valBuffer.mark(0);
-        if (valBuffer.getPosition() != keyLength)
-          throw new IOException(key + " read " + valBuffer.getPosition()
-                                + " bytes, should read " + keyLength);
-      } else {
-        //Reset syncSeen
-        syncSeen = false;
-        
-        if (noBufferedKeys == 0) {
-          try {
-            readBlock();
-          } catch (EOFException eof) {
-            return false;
-          }
-        }
-        
-        int keyLength = WritableUtils.readVInt(keyLenIn);
-        
-        // Sanity check
-        if (keyLength < 0) {
-          return false;
-        }
-        
-        //Read another compressed 'key'
-        key.readFields(keyIn);
-        --noBufferedKeys;
-      }
-
-      return true;
-    }
-
-    /** Read the next key/value pair in the file into <code>key</code> and
-     * <code>val</code>.  Returns true if such a pair exists and false when at
-     * end of file */
-    public synchronized boolean next(Writable key, Writable val)
-      throws IOException {
-      if (val.getClass() != getValueClass())
-        throw new IOException("wrong value class: "+val+" is not "+valClass);
-
-      boolean more = next(key);
-      
-      if (more) {
-        getCurrentValue(val);
-      }
-
-      return more;
-    }
-    
-    /**
-     * Read and return the next record length, potentially skipping over 
-     * a sync block.
-     * @return the length of the next record or -1 if there is no next record
-     * @throws IOException
-     */
-    private synchronized int readRecordLength() throws IOException {
-      if (in.getPos() >= end) {
-        return -1;
-      }      
-      int length = in.readInt();
-      if (version > 1 && sync != null &&
-          length == SYNC_ESCAPE) {              // process a sync entry
-        in.readFully(syncCheck);                // read syncCheck
-        if (!Arrays.equals(sync, syncCheck))    // check it
-          throw new IOException("File is corrupt!");
-        syncSeen = true;
-        if (in.getPos() >= end) {
-          return -1;
-        }
-        length = in.readInt();                  // re-read length
-      } else {
-        syncSeen = false;
-      }
-      
-      return length;
-    }
-    
-    /** Read the next key/value pair in the file into <code>buffer</code>.
-     * Returns the length of the key read, or -1 if at end of file.  The length
-     * of the value may be computed by calling buffer.getLength() before and
-     * after calls to this method. */
-    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
-    @Deprecated
-    synchronized int next(DataOutputBuffer buffer) throws IOException {
-      // Unsupported for block-compressed sequence files
-      if (blockCompressed) {
-        throw new IOException("Unsupported call for block-compressed" +
-                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
-      }
-      try {
-        int length = readRecordLength();
-        if (length == -1) {
-          return -1;
-        }
-        int keyLength = in.readInt();
-        buffer.write(in, length);
-        return keyLength;
-      } catch (ChecksumException e) {             // checksum failure
-        handleChecksumException(e);
-        return next(buffer);
-      }
-    }
-
-    public ValueBytes createValueBytes() {
-      ValueBytes val = null;
-      if (!decompress || blockCompressed) {
-        val = new UncompressedBytes();
-      } else {
-        val = new CompressedBytes(codec);
-      }
-      return val;
-    }
-
-    /**
-     * Read 'raw' records.
-     * @param key - The buffer into which the key is read
-     * @param val - The 'raw' value
-     * @return Returns the total record length or -1 for end of file
-     * @throws IOException
-     */
-    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
-      throws IOException {
-      if (!blockCompressed) {
-        int length = readRecordLength();
-        if (length == -1) {
-          return -1;
-        }
-        int keyLength = in.readInt();
-        int valLength = le

<TRUNCATED>