You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:19:30 UTC

svn commit: r1181555 [2/4] - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/io/hfile/ test/java/org/apache/hadoop/hbase/io/hfile/

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1181555&r1=1181554&r2=1181555&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Oct 11 02:19:30 2011
@@ -19,26 +19,14 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,26 +36,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
-import org.apache.hadoop.hbase.util.BloomFilter;
-import org.apache.hadoop.hbase.util.ByteBloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
 
 /**
  * File format for hbase.
@@ -135,25 +111,13 @@ import org.apache.hadoop.io.compress.Dec
 public class HFile {
   static final Log LOG = LogFactory.getLog(HFile.class);
 
-  /* These values are more or less arbitrary, and they are used as a
-   * form of check to make sure the file isn't completely corrupt.
-   */
-  final static byte [] DATABLOCKMAGIC =
-    {'D', 'A', 'T', 'A', 'B', 'L', 'K', 42 };
-  final static byte [] INDEXBLOCKMAGIC =
-    { 'I', 'D', 'X', 'B', 'L', 'K', 41, 43 };
-  final static byte [] METABLOCKMAGIC =
-    { 'M', 'E', 'T', 'A', 'B', 'L', 'K', 99 };
-  final static byte [] TRAILERBLOCKMAGIC =
-    { 'T', 'R', 'A', 'B', 'L', 'K', 34, 36 };
-
   /**
    * Maximum length of key in HFile.
    */
   public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
 
   /**
-   * Default blocksize for hfile.
+   * Default block size for an HFile.
    */
   public final static int DEFAULT_BLOCKSIZE = 64 * 1024;
 
@@ -167,15 +131,25 @@ public class HFile {
    */
   public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
     Compression.Algorithm.NONE;
+
+  /** Minimum supported HFile format version */
+  public static final int MIN_FORMAT_VERSION = 1;
+
+  /** Maximum supported HFile format version */
+  public static final int MAX_FORMAT_VERSION = 2;
+
   /** Default compression name: none. */
   public final static String DEFAULT_COMPRESSION =
     DEFAULT_COMPRESSION_ALGORITHM.getName();
 
+  /** Separator between HFile name and offset in block cache key */
+  static final char CACHE_KEY_SEPARATOR = '_';
+
   // For measuring latency of "typical" reads and writes
-  private static volatile int readOps;
-  private static volatile long readTime;
-  private static volatile int writeOps;
-  private static volatile long writeTime;
+  static volatile int readOps;
+  static volatile long readTime;
+  static volatile int writeOps;
+  static volatile long writeTime;
 
   public static final int getReadOps() {
     int ret = readOps;
@@ -209,1761 +183,203 @@ public class HFile {
    * @return bytes per checksum for HFile
    */
   public static int getBytesPerChecksum(Configuration hconf,
-		Configuration fsconf) {
+      Configuration fsconf) {
     int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
     if (hconf != null) {
-	bytesPerChecksum =  hconf.getInt("hfile.io.bytes.per.checksum",
-			fsconf.getInt("io.bytes.per.checksum",
-					HFile.DEFAULT_BYTES_PER_CHECKSUM));
+      bytesPerChecksum =  hconf.getInt("hfile.io.bytes.per.checksum",
+          fsconf.getInt("io.bytes.per.checksum",
+              HFile.DEFAULT_BYTES_PER_CHECKSUM));
     }
     return bytesPerChecksum;
   }
-  /**
-   * HFile Writer.
-   */
-  public static class Writer implements Closeable {
-    // FileSystem stream to write on.
-    private FSDataOutputStream outputStream;
-    // True if we opened the <code>outputStream</code> (and so will close it).
-    private boolean closeOutputStream;
-
-    // Name for this object used when logging or in toString.  Is either
-    // the result of a toString on stream or else toString of passed file Path.
-    protected String name;
-
-    // Total uncompressed bytes, maybe calculate a compression ratio later.
-    private long totalBytes = 0;
-
-    // Total # of key/value entries, ie: how many times add() was called.
-    private long entryCount = 0;
-
-    // Used calculating average key and value lengths.
-    private long keylength = 0;
-    private long valuelength = 0;
-
-    // Used to ensure we write in order.
-    private final RawComparator<byte []> comparator;
-
-    // A stream made per block written.
-    private DataOutputStream out;
-
-    // Number of uncompressed bytes per block.  Reinitialized when we start
-    // new block.
-    private int blocksize;
-
-    // Offset where the current block began.
-    private long blockBegin;
-
-    // First key in a block (Not first key in file).
-    private byte [] firstKey = null;
-
-    // Key previously appended.  Becomes the last key in the file.
-    private byte [] lastKeyBuffer = null;
-    private int lastKeyOffset = -1;
-    private int lastKeyLength = -1;
-
-    // See {@link BlockIndex}. Below four fields are used to write the block
-    // index.
-    ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
-    // Block offset in backing stream.
-    ArrayList<Long> blockOffsets = new ArrayList<Long>();
-    // Raw (decompressed) data size.
-    ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
-
-    // Meta block system.
-    private ArrayList<byte []> metaNames = new ArrayList<byte []>();
-    private ArrayList<Writable> metaData = new ArrayList<Writable>();
-
-    // Used compression.  Used even if no compression -- 'none'.
-    private final Compression.Algorithm compressAlgo;
-    private Compressor compressor;
-
-    // Special datastructure to hold fileinfo.
-    private FileInfo fileinfo = new FileInfo();
-
-    // May be null if we were passed a stream.
-    private Path path = null;
-
-    // Block cache to optionally fill on write
-    private BlockCache blockCache;
-
-    // Additional byte array output stream used to fill block cache
-    private ByteArrayOutputStream baos;
-    private DataOutputStream baosDos;
-    private int blockNumber = 0;
-
-    /**
-     * Constructor that uses all defaults for compression and block size.
-     * @param fs
-     * @param path
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Path path)
-    throws IOException {
-      this(fs, path, DEFAULT_BLOCKSIZE, DEFAULT_BYTES_PER_CHECKSUM,
-		(Compression.Algorithm) null, null, null);
-    }
-
-    /**
-     * Constructor that takes a Path.
-     * @param fs
-     * @param path
-     * @param blocksize
-     * @param bytesPerChecksum
-     * @param compress
-     * @param comparator
-     * @throws IOException
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Path path, int blocksize, int bytesPerChecksum,
-      String compress, final KeyComparator comparator)
-    throws IOException {
-      this(fs, path, blocksize, bytesPerChecksum,
-        compress == null? DEFAULT_COMPRESSION_ALGORITHM:
-          Compression.getCompressionAlgorithmByName(compress),
-        comparator, null);
-    }
-
-    /**
-     * Constructor that takes a Path.
-     * @param fs
-     * @param path
-     * @param blocksize
-     * @param bytesPerChecksum
-     * @param compress
-     * @param comparator
-     * @throws IOException
-     */
-    public Writer(FileSystem fs, Path path, int blocksize, int bytesPerChecksum,
-      Compression.Algorithm compress,
-      final KeyComparator comparator, BlockCache blockCache)
-    throws IOException {
-      this(fs.create(path,
-		FsPermission.getDefault(),
-		true,
-          fs.getConf().getInt("io.file.buffer.size", 4096),
-          fs.getDefaultReplication(),
-          fs.getDefaultBlockSize(),
-          bytesPerChecksum,
-          null),
-          blocksize, compress, comparator);
-      this.closeOutputStream = true;
-      this.name = path.getName();
-      this.path = path;
-      this.blockCache = blockCache;
-    }
-
-    /**
-     * Constructor that takes a stream.
-     * @param ostream Stream to use.
-     * @param blocksize
-     * @param compress
-     * @param c RawComparator to use.
-     * @throws IOException
-     */
-    public Writer(final FSDataOutputStream ostream, final int blocksize,
-      final String  compress, final KeyComparator c)
-    throws IOException {
-      this(ostream, blocksize,
-        Compression.getCompressionAlgorithmByName(compress), c);
-    }
 
-    /**
-     * Constructor that takes a stream.
-     * @param ostream Stream to use.
-     * @param blocksize
-     * @param compress
-     * @param c
-     * @throws IOException
-     */
-    public Writer(final FSDataOutputStream ostream, final int blocksize,
-      final Compression.Algorithm  compress, final KeyComparator c)
-    throws IOException {
-      this.outputStream = ostream;
-      this.closeOutputStream = false;
-      this.blocksize = blocksize;
-      this.comparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c;
-      this.name = this.outputStream.toString();
-      this.compressAlgo = compress == null?
-        DEFAULT_COMPRESSION_ALGORITHM: compress;
-    }
+  /** API required to write an {@link HFile} */
+  public interface Writer extends Closeable {
 
-    /*
-     * If at block boundary, opens new block.
-     * @throws IOException
-     */
-    private void checkBlockBoundary() throws IOException {
-      if (this.out != null && this.out.size() < blocksize) return;
-      finishBlock();
-      newBlock();
-    }
+    /** Add an element to the file info map. */
+    void appendFileInfo(byte[] key, byte[] value) throws IOException;
 
-    /*
-     * Do the cleanup if a current block.
-     * @throws IOException
-     */
-    private void finishBlock() throws IOException {
-      if (this.out == null) return;
-      long now = System.currentTimeMillis();
-
-      int size = releaseCompressingStream(this.out);
-      this.out = null;
-      blockKeys.add(firstKey);
-      blockOffsets.add(Long.valueOf(blockBegin));
-      blockDataSizes.add(Integer.valueOf(size));
-      this.totalBytes += size;
-
-      writeTime += System.currentTimeMillis() - now;
-      writeOps++;
-
-      if (blockCache != null) {
-        baosDos.flush();
-        byte [] bytes = baos.toByteArray();
-        ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length,
-            bytes.length - DATABLOCKMAGIC.length).slice();
-        String blockName = name + blockNumber;
-        blockCache.cacheBlock(blockName, blockToCache);
-        baosDos.close();
-      }
-      blockNumber++;
-    }
+    void append(KeyValue kv) throws IOException;
 
-    /*
-     * Ready a new block for writing.
-     * @throws IOException
-     */
-    private void newBlock() throws IOException {
-      // This is where the next block begins.
-      blockBegin = outputStream.getPos();
-      this.out = getCompressingStream();
-      this.out.write(DATABLOCKMAGIC);
-      firstKey = null;
-      if (blockCache != null) {
-        this.baos = new ByteArrayOutputStream();
-        this.baosDos = new DataOutputStream(baos);
-        this.baosDos.write(DATABLOCKMAGIC);
-      }
-    }
+    void append(byte[] key, byte[] value) throws IOException;
 
-    /*
-     * Sets up a compressor and creates a compression stream on top of
-     * this.outputStream.  Get one per block written.
-     * @return A compressing stream; if 'none' compression, returned stream
-     * does not compress.
-     * @throws IOException
-     * @see {@link #releaseCompressingStream(DataOutputStream)}
-     */
-    private DataOutputStream getCompressingStream() throws IOException {
-      this.compressor = compressAlgo.getCompressor();
-      // Get new DOS compression stream.  In tfile, the DOS, is not closed,
-      // just finished, and that seems to be fine over there.  TODO: Check
-      // no memory retention of the DOS.  Should I disable the 'flush' on the
-      // DOS as the BCFile over in tfile does?  It wants to make it so flushes
-      // don't go through to the underlying compressed stream.  Flush on the
-      // compressed downstream should be only when done.  I was going to but
-      // looks like when we call flush in here, its legitimate flush that
-      // should go through to the compressor.
-      OutputStream os =
-        this.compressAlgo.createCompressionStream(this.outputStream,
-        this.compressor, 0);
-      return new DataOutputStream(os);
-    }
+    /** @return the path to this {@link HFile} */
+    Path getPath();
 
-    /*
-     * Let go of block compressor and compressing stream gotten in call
-     * {@link #getCompressingStream}.
-     * @param dos
-     * @return How much was written on this stream since it was taken out.
-     * @see #getCompressingStream()
-     * @throws IOException
-     */
-    private int releaseCompressingStream(final DataOutputStream dos)
-    throws IOException {
-      dos.flush();
-      this.compressAlgo.returnCompressor(this.compressor);
-      this.compressor = null;
-      return dos.size();
-    }
+    void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
 
     /**
-     * Add a meta block to the end of the file. Call before close().
-     * Metadata blocks are expensive.  Fill one with a bunch of serialized data
-     * rather than do a metadata block per metadata instance.  If metadata is
-     * small, consider adding to file info using
-     * {@link #appendFileInfo(byte[], byte[])}
-     * @param metaBlockName name of the block
-     * @param content will call readFields to get data later (DO NOT REUSE)
+     * Adds an inline block writer such as a multi-level block index writer or
+     * a compound Bloom filter writer.
      */
-    public void appendMetaBlock(String metaBlockName, Writable content) {
-      byte[] key = Bytes.toBytes(metaBlockName);
-      int i;
-      for (i = 0; i < metaNames.size(); ++i) {
-        // stop when the current key is greater than our own
-        byte[] cur = metaNames.get(i);
-        if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
-            key, 0, key.length) > 0) {
-          break;
-        }
-      }
-      metaNames.add(i, key);
-      metaData.add(i, content);
-    }
+    void addInlineBlockWriter(InlineBlockWriter bloomWriter);
 
     /**
-     * Add to the file info.  Added key value can be gotten out of the return
-     * from {@link Reader#loadFileInfo()}.
-     * @param k Key
-     * @param v Value
-     * @throws IOException
-     */
-    public void appendFileInfo(final byte [] k, final byte [] v)
-    throws IOException {
-      appendFileInfo(this.fileinfo, k, v, true);
-    }
-
-    static FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v,
-      final boolean checkPrefix)
-    throws IOException {
-      if (k == null || v == null) {
-        throw new NullPointerException("Key nor value may be null");
-      }
-      if (checkPrefix &&
-          Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) {
-        throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX +
-          " are reserved");
-      }
-      fi.put(k, v);
-      return fi;
-    }
-
-    /**
-     * @return Path or null if we were passed a stream rather than a Path.
-     */
-    public Path getPath() {
-      return this.path;
-    }
-
-    @Override
-    public String toString() {
-      return "writer=" + (path != null ? path.toString() : null) +
-          ", name=" + this.name +
-          ", compression=" + this.compressAlgo.getName();
-    }
-
-    /**
-     * Add key/value to file.
-     * Keys must be added in an order that agrees with the Comparator passed
-     * on construction.
-     * @param kv KeyValue to add.  Cannot be empty nor null.
-     * @throws IOException
-     */
-    public void append(final KeyValue kv)
-    throws IOException {
-      append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
-        kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
-    }
-
-    /**
-     * Add key/value to file.
-     * Keys must be added in an order that agrees with the Comparator passed
-     * on construction.
-     * @param key Key to add.  Cannot be empty nor null.
-     * @param value Value to add.  Cannot be empty nor null.
-     * @throws IOException
-     */
-    public void append(final byte [] key, final byte [] value)
-    throws IOException {
-      append(key, 0, key.length, value, 0, value.length);
-    }
-
-    /**
-     * Add key/value to file.
-     * Keys must be added in an order that agrees with the Comparator passed
-     * on construction.
-     * @param key
-     * @param koffset
-     * @param klength
-     * @param value
-     * @param voffset
-     * @param vlength
-     * @throws IOException
-     */
-    private void append(final byte [] key, final int koffset, final int klength,
-        final byte [] value, final int voffset, final int vlength)
-    throws IOException {
-      boolean dupKey = checkKey(key, koffset, klength);
-      checkValue(value, voffset, vlength);
-      if (!dupKey) {
-        checkBlockBoundary();
-      }
-      // Write length of key and value and then actual key and value bytes.
-      this.out.writeInt(klength);
-      this.keylength += klength;
-      this.out.writeInt(vlength);
-      this.valuelength += vlength;
-      this.out.write(key, koffset, klength);
-      this.out.write(value, voffset, vlength);
-      // Are we the first key in this block?
-      if (this.firstKey == null) {
-        // Copy the key.
-        this.firstKey = new byte [klength];
-        System.arraycopy(key, koffset, this.firstKey, 0, klength);
-      }
-      this.lastKeyBuffer = key;
-      this.lastKeyOffset = koffset;
-      this.lastKeyLength = klength;
-      this.entryCount ++;
-      // If we are pre-caching blocks on write, fill byte array stream
-      if (blockCache != null) {
-        this.baosDos.writeInt(klength);
-        this.baosDos.writeInt(vlength);
-        this.baosDos.write(key, koffset, klength);
-        this.baosDos.write(value, voffset, vlength);
-      }
-    }
-
-    /*
-     * @param key Key to check.
-     * @return the flag of duplicate Key or not
-     * @throws IOException
-     */
-    private boolean checkKey(final byte [] key, final int offset, final int length)
-    throws IOException {
-      boolean dupKey = false;
-
-      if (key == null || length <= 0) {
-        throw new IOException("Key cannot be null or empty");
-      }
-      if (length > MAXIMUM_KEY_LENGTH) {
-        throw new IOException("Key length " + length + " > " +
-          MAXIMUM_KEY_LENGTH);
-      }
-      if (this.lastKeyBuffer != null) {
-        int keyComp = this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset,
-            this.lastKeyLength, key, offset, length);
-        if (keyComp > 0) {
-          throw new IOException("Added a key not lexically larger than" +
-            " previous key=" + Bytes.toStringBinary(key, offset, length) +
-            ", lastkey=" + Bytes.toStringBinary(this.lastKeyBuffer, this.lastKeyOffset,
-                this.lastKeyLength));
-        } else if (keyComp == 0) {
-          dupKey = true;
-        }
-      }
-      return dupKey;
-    }
-
-    private void checkValue(final byte [] value, final int offset,
-        final int length) throws IOException {
-      if (value == null) {
-        throw new IOException("Value cannot be null");
-      }
-    }
-
-    public long getTotalBytes() {
-      return this.totalBytes;
-    }
-
-    public void close() throws IOException {
-      if (this.outputStream == null) {
-        return;
-      }
-      // Write out the end of the data blocks, then write meta data blocks.
-      // followed by fileinfo, data block index and meta block index.
-
-      finishBlock();
-
-      FixedFileTrailer trailer = new FixedFileTrailer();
-
-      // Write out the metadata blocks if any.
-      ArrayList<Long> metaOffsets = null;
-      ArrayList<Integer> metaDataSizes = null;
-      if (metaNames.size() > 0) {
-        metaOffsets = new ArrayList<Long>(metaNames.size());
-        metaDataSizes = new ArrayList<Integer>(metaNames.size());
-        for (int i = 0 ; i < metaNames.size() ; ++ i ) {
-          // store the beginning offset
-          long curPos = outputStream.getPos();
-          metaOffsets.add(curPos);
-          // write the metadata content
-          DataOutputStream dos = getCompressingStream();
-          dos.write(METABLOCKMAGIC);
-          metaData.get(i).write(dos);
-          int size = releaseCompressingStream(dos);
-          // store the metadata size
-          metaDataSizes.add(size);
-        }
-      }
-
-      // Write fileinfo.
-      trailer.fileinfoOffset = writeFileInfo(this.outputStream);
-
-      // Write the data block index.
-      trailer.dataIndexOffset = BlockIndex.writeIndex(this.outputStream,
-        this.blockKeys, this.blockOffsets, this.blockDataSizes);
-
-      // Meta block index.
-      if (metaNames.size() > 0) {
-        trailer.metaIndexOffset = BlockIndex.writeIndex(this.outputStream,
-          this.metaNames, metaOffsets, metaDataSizes);
-      }
-
-      // Now finish off the trailer.
-      trailer.dataIndexCount = blockKeys.size();
-      trailer.metaIndexCount = metaNames.size();
-
-      trailer.totalUncompressedBytes = totalBytes;
-
-      // the entryCount in the HFile is currently only used for
-      // reporting, and for bloom calculations. This fix only
-      // avoids the counter from wrapping around to -ve values.
-      // If/when we change the FixedFileTrailer format in future,
-      // we can modify the entryCount to a long.
-      if (entryCount > Integer.MAX_VALUE)
-      {
-        trailer.entryCount = Integer.MAX_VALUE;
-      } else {
-        trailer.entryCount = (int)entryCount;
-      }
-
-      trailer.compressionCodec = this.compressAlgo.ordinal();
-
-      trailer.serialize(outputStream);
-
-      if (this.closeOutputStream) {
-        this.outputStream.close();
-        this.outputStream = null;
-      }
-    }
-
-    /*
-     * Add last bits of metadata to fileinfo and then write it out.
-     * Reader will be expecting to find all below.
-     * @param o Stream to write on.
-     * @return Position at which we started writing.
-     * @throws IOException
+     * Store Bloom filter in the file. This does not deal with Bloom filter
+     * internals but is necessary, since Bloom filters are stored differently
+     * in HFile version 1 and version 2.
      */
-    private long writeFileInfo(FSDataOutputStream o) throws IOException {
-      if (this.lastKeyBuffer != null) {
-        // Make a copy.  The copy is stuffed into HMapWritable.  Needs a clean
-        // byte buffer.  Won't take a tuple.
-        byte [] b = new byte[this.lastKeyLength];
-        System.arraycopy(this.lastKeyBuffer, this.lastKeyOffset, b, 0,
-          this.lastKeyLength);
-        appendFileInfo(this.fileinfo, FileInfo.LASTKEY, b, false);
-      }
-      int avgKeyLen = this.entryCount == 0? 0:
-        (int)(this.keylength/this.entryCount);
-      appendFileInfo(this.fileinfo, FileInfo.AVG_KEY_LEN,
-        Bytes.toBytes(avgKeyLen), false);
-      int avgValueLen = this.entryCount == 0? 0:
-        (int)(this.valuelength/this.entryCount);
-      appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN,
-        Bytes.toBytes(avgValueLen), false);
-      appendFileInfo(this.fileinfo, FileInfo.COMPARATOR,
-        Bytes.toBytes(this.comparator.getClass().getName()), false);
-      long pos = o.getPos();
-      this.fileinfo.write(o);
-      return pos;
-    }
+    void addBloomFilter(BloomFilterWriter bfw);
   }
 
   /**
-   * HFile Reader.
+   * This variety of ways to construct writers is used throughout the code, and
+   * we want to be able to swap writer implementations.
    */
-  public static class Reader implements Closeable {
-    // Stream to read from.
-    private FSDataInputStream istream;
-    // True if we should close istream when done.  We don't close it if we
-    // didn't open it.
-    private boolean closeIStream;
-
-    // These are read in when the file info is loaded.
-    HFile.BlockIndex blockIndex;
-    private BlockIndex metaIndex;
-    FixedFileTrailer trailer;
-    private volatile boolean fileInfoLoaded = false;
-
-    // Filled when we read in the trailer.
-    private Compression.Algorithm compressAlgo;
-
-    // Last key in the file.  Filled in when we read in the file info
-    private byte [] lastkey = null;
-    // Stats read in when we load file info.
-    private int avgKeyLen = -1;
-    private int avgValueLen = -1;
-
-    // Used to ensure we seek correctly.
-    RawComparator<byte []> comparator;
-
-    // Size of this file.
-    private final long fileSize;
-
-    // Block cache to use.
-    private final BlockCache cache;
-    public int cacheHits = 0;
-    public int blockLoads = 0;
-    public int metaLoads = 0;
-
-    // Whether file is from in-memory store
-    private boolean inMemory = false;
-
-    // Whether blocks of file should be evicted on close of file
-    private final boolean evictOnClose;
-
-    // Path of file and file name to be used for block names
-    private final Path path;
-    private final String name;
-
-    // table qualified cfName for this HFile.
-    // This is used to report stats on a per-table/CF basis
-    public String cfName = "";
-
-    // various metrics that we want to track on a per-cf basis
-    public String fsReadTimeMetric = "";
-    public String compactionReadTimeMetric = "";
-
-    public String fsBlockReadCntMetric = "";
-    public String compactionBlockReadCntMetric = "";
-
-    public String fsBlockReadCacheHitCntMetric = "";
-    public String compactionBlockReadCacheHitCntMetric = "";
-
-    public String fsMetaBlockReadCntMetric = "";
-    public String fsMetaBlockReadCacheHitCntMetric = "";
-
-    /*
-     * Parse the HFile path to figure out which table and column family
-     * it belongs to. This is used to maintain read statistics on a
-     * per-column-family basis.
-     *
-     * @param path HFile path name
-     */
-    public void parsePath(String path) {
-      String splits[] = path.split("/");
-      this.cfName = "cf." + splits[splits.length - 2];
-
-      this.fsReadTimeMetric =
-        this.cfName + ".fsRead";
-      this.compactionReadTimeMetric =
-        this.cfName + ".compactionRead";
-
-      this.fsBlockReadCntMetric =
-        this.cfName + ".fsBlockReadCnt";
-      this.fsBlockReadCacheHitCntMetric =
-        this.cfName + ".fsBlockReadCacheHitCnt";
-
-      this.compactionBlockReadCntMetric =
-        this.cfName + ".compactionBlockReadCnt";
-      this.compactionBlockReadCacheHitCntMetric =
-        this.cfName + ".compactionBlockReadCacheHitCnt";
-
-      this.fsMetaBlockReadCntMetric =
-        this.cfName + ".fsMetaBlockReadCnt";
-      this.fsMetaBlockReadCacheHitCntMetric =
-        this.cfName + ".fsMetaBlockReadCacheHitCnt";
-    }
-
-    /**
-     * Opens a HFile.  You must load the file info before you can
-     * use it by calling {@link #loadFileInfo()}.
-     *
-     * @param fs filesystem to load from
-     * @param path path within said filesystem
-     * @param cache block cache. Pass null if none.
-     * @throws IOException
-     */
-    public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory,
-        boolean evictOnClose)
-    throws IOException {
-      this(path, fs.open(path), fs.getFileStatus(path).getLen(), cache,
-          inMemory, evictOnClose);
-      this.closeIStream = true;
-      this.parsePath(this.path.toString());
-    }
-
-    /**
-     * Opens a HFile.  You must load the index before you can
-     * use it by calling {@link #loadFileInfo()}.
-     *
-     * @param fsdis input stream.  Caller is responsible for closing the passed
-     * stream.
-     * @param size Length of the stream.
-     * @param cache block cache. Pass null if none.
-     * @param inMemory whether blocks should be marked as in-memory in cache
-     * @param evictOnClose whether blocks in cache should be evicted on close
-     * @throws IOException
-     */
-    public Reader(Path path, final FSDataInputStream fsdis, final long size,
-        final BlockCache cache, final boolean inMemory,
-        final boolean evictOnClose) {
-      this.cache = cache;
-      this.fileSize = size;
-      this.istream = fsdis;
-      this.closeIStream = false;
-      this.inMemory = inMemory;
-      this.evictOnClose = evictOnClose;
-      this.path = path;
-      this.name = path.getName();
-    }
-
-    @Override
-    public String toString() {
-      return "reader=" + this.path.toString() +
-          (!isFileInfoLoaded()? "":
-            ", compression=" + this.compressAlgo.getName() +
-            ", inMemory=" + this.inMemory +
-            ", firstKey=" + toStringFirstKey() +
-            ", lastKey=" + toStringLastKey()) +
-            ", avgKeyLen=" + this.avgKeyLen +
-            ", avgValueLen=" + this.avgValueLen +
-            ", entries=" + this.trailer.entryCount +
-            ", length=" + this.fileSize;
-    }
+  public static abstract class WriterFactory {
+    protected Configuration conf;
 
-    protected String toStringFirstKey() {
-      return KeyValue.keyToString(getFirstKey());
-    }
-
-    protected String toStringLastKey() {
-      return KeyValue.keyToString(getLastKey());
-    }
+    WriterFactory(Configuration conf) { this.conf = conf; }
 
-    public long length() {
-      return this.fileSize;
-    }
-
-    public boolean inMemory() {
-      return this.inMemory;
-    }
+    public abstract Writer createWriter(FileSystem fs, Path path)
+        throws IOException;
 
-	private byte[] readAllIndex(final FSDataInputStream in, final long indexOffset,
-			final int indexSize) throws IOException
-	{
-		byte[] allIndex = new byte[indexSize];
-		in.seek(indexOffset);
-		IOUtils.readFully(in, allIndex, 0, allIndex.length);
+    public abstract Writer createWriter(FileSystem fs, Path path,
+        int blockSize, int bytesPerChecksum, Compression.Algorithm compress,
+        final KeyComparator comparator) throws IOException;
 
-		return allIndex;
-	}
+    public abstract Writer createWriter(FileSystem fs, Path path,
+        int blockSize, int bytesPerChecksum, String compress,
+        final KeyComparator comparator) throws IOException;
 
-    /**
-     * Read in the index and file info.
-     * @return A map of fileinfo data.
-     * See {@link Writer#appendFileInfo(byte[], byte[])}.
-     * @throws IOException
-     */
-    public Map<byte [], byte []> loadFileInfo()
-    throws IOException {
-      this.trailer = readTrailer();
-
-      // Read in the fileinfo and get what we need from it.
-      this.istream.seek(this.trailer.fileinfoOffset);
-      FileInfo fi = new FileInfo();
-      fi.readFields(this.istream);
-      this.lastkey = fi.get(FileInfo.LASTKEY);
-      this.avgKeyLen = Bytes.toInt(fi.get(FileInfo.AVG_KEY_LEN));
-      this.avgValueLen = Bytes.toInt(fi.get(FileInfo.AVG_VALUE_LEN));
-      String clazzName = Bytes.toString(fi.get(FileInfo.COMPARATOR));
-      this.comparator = getComparator(clazzName);
-
-	  int allIndexSize = (int)(this.fileSize - this.trailer.dataIndexOffset - FixedFileTrailer.trailerSize());
-	  byte[] dataAndMetaIndex = readAllIndex(this.istream, this.trailer.dataIndexOffset, allIndexSize);
-
-	  ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
-	  DataInputStream dis = new DataInputStream(bis);
-
-      // Read in the data index.
-    this.blockIndex =
-      BlockIndex.readIndex(this.comparator, dis, this.trailer.dataIndexCount);
-
-      // Read in the metadata index.
-      if (trailer.metaIndexCount > 0) {
-		  this.metaIndex = BlockIndex.readIndex(Bytes.BYTES_RAWCOMPARATOR,
-				  dis, this.trailer.metaIndexCount);
-      }
-      this.fileInfoLoaded = true;
+    public abstract Writer createWriter(final FSDataOutputStream ostream,
+        final int blockSize, final String compress,
+        final KeyComparator comparator) throws IOException;
 
-	  if (null != dis)
-	  {
-		  dis.close();
-	  }
+    public abstract Writer createWriter(final FSDataOutputStream ostream,
+        final int blockSize, final Compression.Algorithm compress,
+        final KeyComparator c) throws IOException;
+  }
 
-      return fi;
-    }
+  /** The configuration key for HFile version to use for new files */
+  public static final String FORMAT_VERSION_KEY = "hfile.format.version";
 
-    boolean isFileInfoLoaded() {
-      return this.fileInfoLoaded;
-    }
+  public static int getFormatVersion(Configuration conf) {
+    int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
+    checkFormatVersion(version);
+    return version;
+  }
 
-    @SuppressWarnings("unchecked")
-    private RawComparator<byte []> getComparator(final String clazzName)
-    throws IOException {
-      if (clazzName == null || clazzName.length() == 0) {
-        return null;
-      }
-      try {
-        return (RawComparator<byte []>)Class.forName(clazzName).newInstance();
-      } catch (InstantiationException e) {
-        throw new IOException(e);
-      } catch (IllegalAccessException e) {
-        throw new IOException(e);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
-      }
+  /**
+   * Returns the factory to be used to create {@link HFile} writers. Should
+   * always be {@link HFileWriterV2#WRITER_FACTORY_V2} in production, but
+   * can also be {@link HFileWriterV1#WRITER_FACTORY_V1} in testing.
+   */
+  public static final WriterFactory getWriterFactory(Configuration conf) {
+    int version = getFormatVersion(conf);
+    LOG.debug("Using HFile format version " + version);
+    switch (version) {
+    case 1:
+      return new HFileWriterV1.WriterFactoryV1(conf);
+    case 2:
+      return new HFileWriterV2.WriterFactoryV2(conf);
+    default:
+      throw new IllegalArgumentException("Cannot create writer for HFile " +
+          "format version " + version);
     }
+  }
 
-    /* Read the trailer off the input stream.  As side effect, sets the
-     * compression algorithm.
-     * @return Populated FixedFileTrailer.
-     * @throws IOException
-     */
-    private FixedFileTrailer readTrailer() throws IOException {
-      FixedFileTrailer fft = new FixedFileTrailer();
-      long seekPoint = this.fileSize - FixedFileTrailer.trailerSize();
-      this.istream.seek(seekPoint);
-      fft.deserialize(this.istream);
-      // Set up the codec.
-      this.compressAlgo =
-        Compression.Algorithm.values()[fft.compressionCodec];
-      return fft;
-    }
+  /**
+   * Configuration key to evict all blocks of a given file from the block cache
+   * when the file is closed.
+   */
+  public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
+      "hbase.rs.evictblocksonclose";
 
-    /**
-     * Create a Scanner on this file.  No seeks or reads are done on creation.
-     * Call {@link HFileScanner#seekTo(byte[])} to position an start the read.
-     * There is nothing to clean up in a Scanner. Letting go of your references
-     * to the scanner is sufficient.
-     * NOTE: Do not use this overload of getScanner for compactions.
-     * @param cacheBlocks True if we should cache blocks read in by this scanner.
-     * @param pread Use positional read rather than seek+read if true (pread is
-     * better for random reads, seek+read is better scanning).
-     * @return Scanner on this file.
-     */
-    public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
-      return getScanner(cacheBlocks, pread, false);
-    }
+  /**
+   * Configuration key to cache data blocks on write. There are separate
+   * switches for Bloom blocks and non-root index blocks.
+   */
+  public static final String CACHE_BLOCKS_ON_WRITE_KEY =
+      "hbase.rs.cacheblocksonwrite";
 
-    /**
-     * Create a Scanner on this file.  No seeks or reads are done on creation.
-     * Call {@link HFileScanner#seekTo(byte[])} to position an start the read.
-     * There is nothing to clean up in a Scanner. Letting go of your references
-     * to the scanner is sufficient.
-     * @param cacheBlocks True if we should cache blocks read in by this scanner.
-     * @param pread Use positional read rather than seek+read if true (pread is
-     * better for random reads, seek+read is better scanning).
-     * @param isCompaction is scanner being used for a compaction?
-     * @return Scanner on this file.
-     */
-    public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
-                                  final boolean isCompaction) {
-      return new Scanner(this, cacheBlocks, pread, isCompaction);
-    }
+  /** An interface used by clients to open and iterate an {@link HFile}. */
+  public interface Reader extends Closeable {
 
     /**
-     * @param key Key to search.
-     * @return Block number of the block containing the key or -1 if not in this
-     * file.
-     */
-    protected int blockContainingKey(final byte [] key, int offset, int length) {
-      if (blockIndex == null) {
-        throw new RuntimeException("Block index not loaded");
-      }
-      return blockIndex.blockContainingKey(key, offset, length);
-    }
-    /**
-     * @param metaBlockName
-     * @param cacheBlock Add block to cache, if found
-     * @return Block wrapped in a ByteBuffer
-     * @throws IOException
+     * Returns this reader's "name". Usually the last component of the path.
+     * Needs to be constant as the file is being moved to support caching on
+     * write.
      */
-    public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
-    throws IOException {
-      if (trailer.metaIndexCount == 0) {
-        return null; // there are no meta blocks
-      }
-      if (metaIndex == null) {
-        throw new IOException("Meta index not loaded");
-      }
+     String getName();
 
-      byte [] mbname = Bytes.toBytes(metaBlockName);
-      int block = metaIndex.blockContainingKey(mbname, 0, mbname.length);
-      if (block == -1)
-        return null;
-      long blockSize;
-      if (block == metaIndex.count - 1) {
-        blockSize = trailer.fileinfoOffset - metaIndex.blockOffsets[block];
-      } else {
-        blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block];
-      }
+     String getColumnFamilyName();
 
-      long now = System.currentTimeMillis();
+     RawComparator<byte []> getComparator();
 
-      // Per meta key from any given file, synchronize reads for said block
-      synchronized (metaIndex.blockKeys[block]) {
-        metaLoads++;
-        HRegion.incrNumericMetric(this.fsMetaBlockReadCntMetric, 1);
-        // Check cache for block.  If found return.
-        if (cache != null) {
-          ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block);
-          if (cachedBuf != null) {
-            // Return a distinct 'shallow copy' of the block,
-            // so pos doesnt get messed by the scanner
-            cacheHits++;
-            HRegion.incrNumericMetric(this.fsMetaBlockReadCacheHitCntMetric, 1);
-            return cachedBuf.duplicate();
-          }
-          // Cache Miss, please load.
-        }
+     HFileScanner getScanner(boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction);
 
-        ByteBuffer buf = decompress(metaIndex.blockOffsets[block],
-          longToInt(blockSize), metaIndex.blockDataSizes[block], true);
-        byte [] magic = new byte[METABLOCKMAGIC.length];
-        buf.get(magic, 0, magic.length);
+     ByteBuffer getMetaBlock(String metaBlockName,
+        boolean cacheBlock) throws IOException;
 
-        if (! Arrays.equals(magic, METABLOCKMAGIC)) {
-          throw new IOException("Meta magic is bad in block " + block);
-        }
+     HFileBlock readBlock(long offset, int onDiskBlockSize,
+         boolean cacheBlock, final boolean pread, final boolean isCompaction)
+         throws IOException;
 
-        // Create a new ByteBuffer 'shallow copy' to hide the magic header
-        buf = buf.slice();
+     Map<byte[], byte[]> loadFileInfo() throws IOException;
 
-        long delta = System.currentTimeMillis() - now;
-        HRegion.incrTimeVaryingMetric(this.fsReadTimeMetric, delta);
-        readTime += delta;
-        readOps++;
-
-        // Cache the block
-        if(cacheBlock && cache != null) {
-          cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory);
-        }
+     byte[] getLastKey();
 
-        return buf;
-      }
-    }
+     byte[] midkey() throws IOException;
 
-    /**
-     * Read in a file block.
-     * @param block Index of block to read.
-     * @param pread Use positional read instead of seek+read (positional is
-     * better doing random reads whereas seek+read is better scanning).
-     * @param isCompaction is this block being read as part of a compaction
-     * @return Block wrapped in a ByteBuffer.
-     * @throws IOException
-     */
-    ByteBuffer readBlock(int block, boolean cacheBlock, final boolean pread,
-                         final boolean isCompaction)
-    throws IOException {
-      if (blockIndex == null) {
-        throw new IOException("Block index not loaded");
-      }
-      if (block < 0 || block >= blockIndex.count) {
-        throw new IOException("Requested block is out of range: " + block +
-          ", max: " + blockIndex.count);
-      }
-      // For any given block from any given file, synchronize reads for said
-      // block.
-      // Without a cache, this synchronizing is needless overhead, but really
-      // the other choice is to duplicate work (which the cache would prevent you from doing).
-      synchronized (blockIndex.blockKeys[block]) {
-        blockLoads++;
-
-        if (isCompaction) {
-          HRegion.incrNumericMetric(this.compactionBlockReadCntMetric, 1);
-        } else {
-          HRegion.incrNumericMetric(this.fsBlockReadCntMetric, 1);
-        }
+     long length();
 
-        // Check cache for block.  If found return.
-        if (cache != null) {
-          ByteBuffer cachedBuf = cache.getBlock(name + block);
-          if (cachedBuf != null) {
-            // Return a distinct 'shallow copy' of the block,
-            // so pos doesnt get messed by the scanner
-            cacheHits++;
-
-            if (isCompaction) {
-              HRegion.incrNumericMetric(
-                  this.compactionBlockReadCacheHitCntMetric, 1);
-            } else {
-              HRegion.incrNumericMetric(
-                  this.fsBlockReadCacheHitCntMetric, 1);
-            }
-
-            return cachedBuf.duplicate();
-          }
-          // Carry on, please load.
-        }
+     long getEntries();
 
-        // Load block from filesystem.
-        long now = System.currentTimeMillis();
-        long onDiskBlockSize;
-        if (block == blockIndex.count - 1) {
-          // last block!  The end of data block is first meta block if there is
-          // one or if there isn't, the fileinfo offset.
-          long offset = this.metaIndex != null?
-            this.metaIndex.blockOffsets[0]: this.trailer.fileinfoOffset;
-          onDiskBlockSize = offset - blockIndex.blockOffsets[block];
-        } else {
-          onDiskBlockSize = blockIndex.blockOffsets[block+1] -
-          blockIndex.blockOffsets[block];
-        }
-        ByteBuffer buf = decompress(blockIndex.blockOffsets[block],
-          longToInt(onDiskBlockSize), this.blockIndex.blockDataSizes[block],
-          pread);
-
-        byte [] magic = new byte[DATABLOCKMAGIC.length];
-        buf.get(magic, 0, magic.length);
-        if (!Arrays.equals(magic, DATABLOCKMAGIC)) {
-          throw new IOException("Data magic is bad in block " + block);
-        }
+     byte[] getFirstKey();
 
-        // 'shallow copy' to hide the header
-        // NOTE: you WILL GET BIT if you call buf.array() but don't start
-        //       reading at buf.arrayOffset()
-        buf = buf.slice();
-
-        long delta = System.currentTimeMillis() - now;
-        readTime += delta;
-        readOps++;
-        if (isCompaction) {
-          HRegion.incrTimeVaryingMetric(this.compactionReadTimeMetric, delta);
-        } else {
-          HRegion.incrTimeVaryingMetric(this.fsReadTimeMetric, delta);
-        }
+     long indexSize();
 
+     byte[] getFirstRowKey();
 
-        // Cache the block
-        if(cacheBlock && cache != null) {
-          cache.cacheBlock(name + block, buf.duplicate(), inMemory);
-        }
+     byte[] getLastRowKey();
 
-        return buf;
-      }
-    }
+     FixedFileTrailer getTrailer();
 
-    /*
-     * Decompress <code>compressedSize</code> bytes off the backing
-     * FSDataInputStream.
-     * @param offset
-     * @param compressedSize
-     * @param decompressedSize
-     *
-     * @return
-     * @throws IOException
-     */
-    private ByteBuffer decompress(final long offset, final int compressedSize,
-      final int decompressedSize, final boolean pread)
-    throws IOException {
-      Decompressor decompressor = null;
-      ByteBuffer buf = null;
-      try {
-        decompressor = this.compressAlgo.getDecompressor();
-        // My guess is that the bounded range fis is needed to stop the
-        // decompressor reading into next block -- IIRC, it just grabs a
-        // bunch of data w/o regard to whether decompressor is coming to end of a
-        // decompression.
-        InputStream is = this.compressAlgo.createDecompressionStream(
-            new BufferedInputStream(
-                new BoundedRangeFileInputStream(this.istream, offset, compressedSize,
-                                                pread),
-                Math.min(1<<20, compressedSize)),
-            decompressor, 0);
-        buf = ByteBuffer.allocate(decompressedSize);
-        IOUtils.readFully(is, buf.array(), 0, buf.capacity());
-        is.close();
-      } finally {
-        if (null != decompressor) {
-          this.compressAlgo.returnDecompressor(decompressor);
-        }
-      }
-      return buf;
-    }
+     HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
 
-    /**
-     * @return First key in the file.  May be null if file has no entries.
-     * Note that this is not the first rowkey, but rather the byte form of
-     * the first KeyValue.
-     */
-    public byte [] getFirstKey() {
-      if (blockIndex == null) {
-        throw new RuntimeException("Block index not loaded");
-      }
-      return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
-    }
+     HFileScanner getScanner(boolean cacheBlocks, boolean pread);
 
-    /**
-     * @return the first row key, or null if the file is empty.
-     * TODO move this to StoreFile after Ryan's patch goes in
-     * to eliminate KeyValue here
-     */
-    public byte[] getFirstRowKey() {
-      byte[] firstKey = getFirstKey();
-      if (firstKey == null) return null;
-      return KeyValue.createKeyValueFromKey(firstKey).getRow();
-    }
+     Compression.Algorithm getCompressionAlgorithm();
 
     /**
-     * @return number of KV entries in this HFile
+     * Retrieves Bloom filter metadata as appropriate for each {@link HFile}
+     * version. Knows nothing about how that metadata is structured.
      */
-    public long getEntries() {
-      if (!this.isFileInfoLoaded()) {
-        throw new RuntimeException("File info not loaded");
-      }
-      return this.trailer.entryCount;
-    }
-
-    /**
-     * @return Last key in the file.  May be null if file has no entries.
-     * Note that this is not the last rowkey, but rather the byte form of
-     * the last KeyValue.
-     */
-    public byte [] getLastKey() {
-      if (!isFileInfoLoaded()) {
-        throw new RuntimeException("Load file info first");
-      }
-      return this.blockIndex.isEmpty()? null: this.lastkey;
-    }
-
-    /**
-     * @return the last row key, or null if the file is empty.
-     * TODO move this to StoreFile after Ryan's patch goes in
-     * to eliminate KeyValue here
-     */
-    public byte[] getLastRowKey() {
-      byte[] lastKey = getLastKey();
-      if (lastKey == null) return null;
-      return KeyValue.createKeyValueFromKey(lastKey).getRow();
-    }
-
-    /**
-     * @return number of K entries in this HFile's filter.  Returns KV count if no filter.
-     */
-    public long getFilterEntries() {
-      return getEntries();
-    }
-
-    /**
-     * @return Comparator.
-     */
-    public RawComparator<byte []> getComparator() {
-      return this.comparator;
-    }
-
-    public Compression.Algorithm getCompressionAlgorithm() {
-      return this.compressAlgo;
-    }
-
-    /**
-     * @return index size
-     */
-    public long indexSize() {
-      return (this.blockIndex != null? this.blockIndex.heapSize(): 0) +
-        ((this.metaIndex != null)? this.metaIndex.heapSize(): 0);
-    }
-
-    /**
-     * @return Midkey for this file.  We work with block boundaries only so
-     * returned midkey is an approximation only.
-     * @throws IOException
-     */
-    public byte [] midkey() throws IOException {
-      if (!isFileInfoLoaded() || this.blockIndex.isEmpty()) {
-        return null;
-      }
-      return this.blockIndex.midkey();
-    }
-
-    public void close() throws IOException {
-      if (evictOnClose && this.cache != null) {
-        int numEvicted = 0;
-        for (int i=0; i<blockIndex.count; i++) {
-          if (this.cache.evictBlock(name + i)) numEvicted++;
-        }
-        LOG.debug("On close of file " + name + " evicted " + numEvicted +
-            " block(s) of " + blockIndex.count + " total blocks");
-      }
-      if (this.closeIStream && this.istream != null) {
-        this.istream.close();
-        this.istream = null;
-      }
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public Path getPath() {
-      return path;
-    }
-
-    /*
-     * Implementation of {@link HFileScanner} interface.
-     */
-    protected static class Scanner implements HFileScanner {
-      private final Reader reader;
-      private ByteBuffer block;
-      private int currBlock;
-
-      private final boolean cacheBlocks;
-      private final boolean pread;
-      private final boolean isCompaction;
-
-      private int currKeyLen = 0;
-      private int currValueLen = 0;
-
-      public int blockFetches = 0;
-
-      public Scanner(Reader r, boolean cacheBlocks, final boolean pread,
-                    final boolean isCompaction) {
-        this.reader = r;
-        this.cacheBlocks = cacheBlocks;
-        this.pread = pread;
-        this.isCompaction = isCompaction;
-      }
-
-      public KeyValue getKeyValue() {
-        if(this.block == null) {
-          return null;
-        }
-        return new KeyValue(this.block.array(),
-            this.block.arrayOffset() + this.block.position() - 8);
-      }
-
-      public ByteBuffer getKey() {
-        if (this.block == null || this.currKeyLen == 0) {
-          throw new RuntimeException("you need to seekTo() before calling getKey()");
-        }
-        ByteBuffer keyBuff = this.block.slice();
-        keyBuff.limit(this.currKeyLen);
-        keyBuff.rewind();
-        // Do keyBuff.asReadOnly()?
-        return keyBuff;
-      }
-
-      public ByteBuffer getValue() {
-        if (block == null || currKeyLen == 0) {
-          throw new RuntimeException("you need to seekTo() before calling getValue()");
-        }
-        // TODO: Could this be done with one ByteBuffer rather than create two?
-        ByteBuffer valueBuff = this.block.slice();
-        valueBuff.position(this.currKeyLen);
-        valueBuff = valueBuff.slice();
-        valueBuff.limit(currValueLen);
-        valueBuff.rewind();
-        return valueBuff;
-      }
-
-      public boolean next() throws IOException {
-        // LOG.deug("rem:" + block.remaining() + " p:" + block.position() +
-        // " kl: " + currKeyLen + " kv: " + currValueLen);
-        if (block == null) {
-          throw new IOException("Next called on non-seeked scanner");
-        }
-        try {
-          block.position(block.position() + currKeyLen + currValueLen);
-        } catch (IllegalArgumentException e) {
-          LOG.error("Current pos = " + block.position() +
-                    "; currKeyLen = " + currKeyLen +
-                    "; currValLen = " + currValueLen +
-                    "; block limit = " + block.limit() +
-                    "; HFile name = " + reader.getName() +
-                    "; currBlock id = " + this.currBlock);
-          throw e;
-        }
-        if (block.remaining() <= 0) {
-          // LOG.debug("Fetch next block");
-          currBlock++;
-          if (currBlock >= reader.blockIndex.count) {
-            // damn we are at the end
-            currBlock = 0;
-            block = null;
-            return false;
-          }
-          block = reader.readBlock(this.currBlock, this.cacheBlocks,
-                                   this.pread, this.isCompaction);
-          currKeyLen = block.getInt();
-          currValueLen = block.getInt();
-          blockFetches++;
-          return true;
-        }
-        // LOG.debug("rem:" + block.remaining() + " p:" + block.position() +
-        // " kl: " + currKeyLen + " kv: " + currValueLen);
-
-        currKeyLen = block.getInt();
-        currValueLen = block.getInt();
-        return true;
-      }
-
-      public int seekTo(byte [] key) throws IOException {
-        return seekTo(key, 0, key.length);
-      }
-
-      public int seekTo(byte[] key, int offset, int length) throws IOException {
-        int b = reader.blockContainingKey(key, offset, length);
-        if (b < 0) return -1; // falls before the beginning of the file! :-(
-        // Avoid re-reading the same block (that'd be dumb).
-        loadBlock(b, true);
-        return blockSeek(key, offset, length, false);
-      }
-
-      public int reseekTo(byte [] key) throws IOException {
-        return reseekTo(key, 0, key.length);
-      }
-
-      public int reseekTo(byte[] key, int offset, int length)
-        throws IOException {
-
-        if (this.block != null && this.currKeyLen != 0) {
-          ByteBuffer bb = getKey();
-          int compared = this.reader.comparator.compare(key, offset, length,
-              bb.array(), bb.arrayOffset(), bb.limit());
-          if (compared < 1) {
-            //If the required key is less than or equal to current key, then
-            //don't do anything.
-            return compared;
-          }
-        }
-
-        int b = reader.blockContainingKey(key, offset, length);
-        if (b < 0) {
-          return -1;
-        }
-        loadBlock(b, false);
-        return blockSeek(key, offset, length, false);
-      }
-
-      /**
-       * Within a loaded block, seek looking for the first key
-       * that is smaller than (or equal to?) the key we are interested in.
-       *
-       * A note on the seekBefore - if you have seekBefore = true, AND the
-       * first key in the block = key, then you'll get thrown exceptions.
-       * @param key to find
-       * @param seekBefore find the key before the exact match.
-       * @return
-       */
-      private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) {
-        int klen, vlen;
-        int lastLen = 0;
-        do {
-          klen = block.getInt();
-          vlen = block.getInt();
-          int comp = this.reader.comparator.compare(key, offset, length,
-            block.array(), block.arrayOffset() + block.position(), klen);
-          if (comp == 0) {
-            if (seekBefore) {
-              block.position(block.position() - lastLen - 16);
-              currKeyLen = block.getInt();
-              currValueLen = block.getInt();
-              return 1; // non exact match.
-            }
-            currKeyLen = klen;
-            currValueLen = vlen;
-            return 0; // indicate exact match
-          }
-          if (comp < 0) {
-            // go back one key:
-            block.position(block.position() - lastLen - 16);
-            currKeyLen = block.getInt();
-            currValueLen = block.getInt();
-            return 1;
-          }
-          block.position(block.position() + klen + vlen);
-          lastLen = klen + vlen ;
-        } while(block.remaining() > 0);
-        // ok we are at the end, so go back a littleeeeee....
-        // The 8 in the below is intentionally different to the 16s in the above
-        // Do the math you you'll figure it.
-        block.position(block.position() - lastLen - 8);
-        currKeyLen = block.getInt();
-        currValueLen = block.getInt();
-        return 1; // didn't exactly find it.
-      }
-
-      public boolean seekBefore(byte [] key) throws IOException {
-        return seekBefore(key, 0, key.length);
-      }
-
-      public boolean seekBefore(byte[] key, int offset, int length)
-      throws IOException {
-        int b = reader.blockContainingKey(key, offset, length);
-        if (b < 0)
-          return false; // key is before the start of the file.
-
-        // Question: does this block begin with 'key'?
-        if (this.reader.comparator.compare(reader.blockIndex.blockKeys[b],
-            0, reader.blockIndex.blockKeys[b].length,
-            key, offset, length) == 0) {
-          // Ok the key we're interested in is the first of the block, so go back one.
-          if (b == 0) {
-            // we have a 'problem', the key we want is the first of the file.
-            return false;
-          }
-          b--;
-          // TODO shortcut: seek forward in this block to the last key of the block.
-        }
-        loadBlock(b, true);
-        blockSeek(key, offset, length, true);
-        return true;
-      }
-
-      public String getKeyString() {
-        return Bytes.toStringBinary(block.array(), block.arrayOffset() +
-          block.position(), currKeyLen);
-      }
-
-      public String getValueString() {
-        return Bytes.toString(block.array(), block.arrayOffset() +
-          block.position() + currKeyLen, currValueLen);
-      }
-
-      public Reader getReader() {
-        return this.reader;
-      }
-
-      public boolean isSeeked(){
-        return this.block != null;
-      }
-
-      public boolean seekTo() throws IOException {
-        if (this.reader.blockIndex.isEmpty()) {
-          return false;
-        }
-        if (block != null && currBlock == 0) {
-          block.rewind();
-          currKeyLen = block.getInt();
-          currValueLen = block.getInt();
-          return true;
-        }
-        currBlock = 0;
-        block = reader.readBlock(this.currBlock, this.cacheBlocks,
-                                 this.pread, this.isCompaction);
-        currKeyLen = block.getInt();
-        currValueLen = block.getInt();
-        blockFetches++;
-        return true;
-      }
-
-      private void loadBlock(int bloc, boolean rewind) throws IOException {
-        if (block == null) {
-          block = reader.readBlock(bloc, this.cacheBlocks,
-                                   this.pread, this.isCompaction);
-          currBlock = bloc;
-          blockFetches++;
-        } else {
-          if (bloc != currBlock) {
-            block = reader.readBlock(bloc, this.cacheBlocks,
-                                     this.pread, this.isCompaction);
-            currBlock = bloc;
-            blockFetches++;
-          } else {
-            // we are already in the same block, just rewind to seek again.
-            if (rewind) {
-              block.rewind();
-            }
-            else {
-              //Go back by (size of rowlength + size of valuelength) = 8 bytes
-              block.position(block.position()-8);
-            }
-          }
-        }
-      }
-
-      @Override
-      public String toString() {
-        return "HFileScanner for reader " + String.valueOf(reader);
-      }
-    }
-
-    public String getTrailerInfo() {
-      return trailer.toString();
-    }
+     DataInput getBloomFilterMetadata() throws IOException;
   }
 
-  /*
-   * The RFile has a fixed trailer which contains offsets to other variable
-   * parts of the file.  Also includes basic metadata on this file.
-   */
-  private static class FixedFileTrailer {
-    // Offset to the fileinfo data, a small block of vitals..
-    long fileinfoOffset;
-    // Offset to the data block index.
-    long dataIndexOffset;
-    // How many index counts are there (aka: block count)
-    int dataIndexCount;
-    // Offset to the meta block index.
-    long metaIndexOffset;
-    // How many meta block index entries (aka: meta block count)
-    int metaIndexCount;
-    long totalUncompressedBytes;
-
-    // Note: An HFile today can contain more than Integer.MAX_VALUE keys.
-    // However, the entryCount (not being used for much today) will
-    // cap out at Integer.MAX_VALUE.
-    // If/when we change the trailer format, we should change the
-    // entryCount datatype to a long.
-    int entryCount;
-    int compressionCodec;
-    int version = 1;
-
-    FixedFileTrailer() {
-      super();
-    }
-
-    static int trailerSize() {
-      // Keep this up to date...
-      return
-      ( Bytes.SIZEOF_INT * 5 ) +
-      ( Bytes.SIZEOF_LONG * 4 ) +
-      TRAILERBLOCKMAGIC.length;
-    }
-
-    void serialize(DataOutputStream outputStream) throws IOException {
-      outputStream.write(TRAILERBLOCKMAGIC);
-      outputStream.writeLong(fileinfoOffset);
-      outputStream.writeLong(dataIndexOffset);
-      outputStream.writeInt(dataIndexCount);
-      outputStream.writeLong(metaIndexOffset);
-      outputStream.writeInt(metaIndexCount);
-      outputStream.writeLong(totalUncompressedBytes);
-      outputStream.writeInt(entryCount);
-      outputStream.writeInt(compressionCodec);
-      outputStream.writeInt(version);
-    }
-
-    void deserialize(DataInputStream inputStream) throws IOException {
-      byte [] header = new byte[TRAILERBLOCKMAGIC.length];
-      inputStream.readFully(header);
-      if ( !Arrays.equals(header, TRAILERBLOCKMAGIC)) {
-        throw new IOException("Trailer 'header' is wrong; does the trailer " +
-          "size match content?");
-      }
-      fileinfoOffset         = inputStream.readLong();
-      dataIndexOffset        = inputStream.readLong();
-      dataIndexCount         = inputStream.readInt();
-
-      metaIndexOffset        = inputStream.readLong();
-      metaIndexCount         = inputStream.readInt();
-
-      totalUncompressedBytes = inputStream.readLong();
-      entryCount             = inputStream.readInt();
-      compressionCodec       = inputStream.readInt();
-      version                = inputStream.readInt();
-
-      if (version != 1) {
-        throw new IOException("Wrong version: " + version);
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "fileinfoOffset=" + fileinfoOffset +
-      ", dataIndexOffset=" + dataIndexOffset +
-      ", dataIndexCount=" + dataIndexCount +
-      ", metaIndexOffset=" + metaIndexOffset +
-      ", metaIndexCount=" + metaIndexCount +
-      ", totalBytes=" + totalUncompressedBytes +
-      ", entryCount=" + entryCount +
-      ", version=" + version;
+  private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
+      long size, boolean closeIStream, BlockCache blockCache,
+      boolean inMemory, boolean evictOnClose) throws IOException {
+    FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
+    switch (trailer.getVersion()) {
+    case 1:
+      return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
+          blockCache, inMemory, evictOnClose);
+    case 2:
+      return new HFileReaderV2(path, trailer, fsdis, size, closeIStream,
+          blockCache, inMemory, evictOnClose);
+    default:
+      throw new IOException("Cannot instantiate reader for HFile version " +
+          trailer.getVersion());
     }
   }
 
-  /*
-   * The block index for a RFile.
-   * Used reading.
-   */
-  static class BlockIndex implements HeapSize {
-    // How many actual items are there? The next insert location too.
-    int count = 0;
-    byte [][] blockKeys;
-    long [] blockOffsets;
-    int [] blockDataSizes;
-    int size = 0;
-
-    /* Needed doing lookup on blocks.
-     */
-    final RawComparator<byte []> comparator;
-
-    /*
-     * Shutdown default constructor
-     */
-    @SuppressWarnings("unused")
-    private BlockIndex() {
-      this(null);
-    }
-
-
-    /**
-     * @param c comparator used to compare keys.
-     */
-    BlockIndex(final RawComparator<byte []>c) {
-      this.comparator = c;
-      // Guess that cost of three arrays + this object is 4 * 8 bytes.
-      this.size += (4 * 8);
-    }
-
-    /**
-     * @return True if block index is empty.
-     */
-    boolean isEmpty() {
-      return this.blockKeys.length <= 0;
-    }
-
-    /**
-     * Adds a new entry in the block index.
-     *
-     * @param key Last key in the block
-     * @param offset file offset where the block is stored
-     * @param dataSize the uncompressed data size
-     */
-    void add(final byte[] key, final long offset, final int dataSize) {
-      blockOffsets[count] = offset;
-      blockKeys[count] = key;
-      blockDataSizes[count] = dataSize;
-      count++;
-      this.size += (Bytes.SIZEOF_INT * 2 + key.length);
-    }
-
-    /**
-     * @param key Key to find
-     * @return Offset of block containing <code>key</code> or -1 if this file
-     * does not contain the request.
-     */
-    int blockContainingKey(final byte[] key, int offset, int length) {
-      int pos = Bytes.binarySearch(blockKeys, key, offset, length, this.comparator);
-      if (pos < 0) {
-        pos ++;
-        pos *= -1;
-        if (pos == 0) {
-          // falls before the beginning of the file.
-          return -1;
-        }
-        // When switched to "first key in block" index, binarySearch now returns
-        // the block with a firstKey < key.  This means the value we want is potentially
-        // in the next block.
-        pos --; // in previous block.
-
-        return pos;
-      }
-      // wow, a perfect hit, how unlikely?
-      return pos;
-    }
-
-    /*
-     * @return File midkey.  Inexact.  Operates on block boundaries.  Does
-     * not go into blocks.
-     */
-    byte [] midkey() throws IOException {
-      int pos = ((this.count - 1)/2);              // middle of the index
-      if (pos < 0) {
-        throw new IOException("HFile empty");
-      }
-      return this.blockKeys[pos];
-    }
-
-    /*
-     * Write out index. Whatever we write here must jibe with what
-     * BlockIndex#readIndex is expecting.  Make sure the two ends of the
-     * index serialization match.
-     * @param o
-     * @param keys
-     * @param offsets
-     * @param sizes
-     * @param c
-     * @return Position at which we entered the index.
-     * @throws IOException
-     */
-    static long writeIndex(final FSDataOutputStream o,
-      final List<byte []> keys, final List<Long> offsets,
-      final List<Integer> sizes)
-    throws IOException {
-      long pos = o.getPos();
-      // Don't write an index if nothing in the index.
-      if (keys.size() > 0) {
-        o.write(INDEXBLOCKMAGIC);
-        // Write the index.
-        for (int i = 0; i < keys.size(); ++i) {
-          o.writeLong(offsets.get(i).longValue());
-          o.writeInt(sizes.get(i).intValue());
-          byte [] key = keys.get(i);
-          Bytes.writeByteArray(o, key);
-        }
-      }
-      return pos;
-    }
-
-    /*
-     * Read in the index that is at <code>indexOffset</code>
-     * Must match what was written by writeIndex in the Writer.close.
-     * @param c Comparator to use.
-     * @param in
-     * @param indexSize
-     * @throws IOException
-     */
-    static BlockIndex readIndex(final RawComparator<byte []> c,
-        DataInputStream in, final int indexSize)
-    throws IOException {
-      BlockIndex bi = new BlockIndex(c);
-      bi.blockOffsets = new long[indexSize];
-      bi.blockKeys = new byte[indexSize][];
-      bi.blockDataSizes = new int[indexSize];
-      // If index size is zero, no index was written.
-      if (indexSize > 0) {
-        byte [] magic = new byte[INDEXBLOCKMAGIC.length];
-        in.readFully(magic);
-        if (!Arrays.equals(magic, INDEXBLOCKMAGIC)) {
-          throw new IOException("Index block magic is wrong: " +
-            Arrays.toString(magic));
-        }
-        for (int i = 0; i < indexSize; ++i ) {
-          long offset   = in.readLong();
-          int dataSize  = in.readInt();
-          byte [] key = Bytes.readByteArray(in);
-          bi.add(key, offset, dataSize);
-        }
-      }
-      return bi;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("size=" + count).append("\n");
-      for (int i = 0; i < count ; i++) {
-        sb.append("key=").append(KeyValue.keyToString(blockKeys[i])).
-          append("\n  offset=").append(blockOffsets[i]).
-          append(", dataSize=" + blockDataSizes[i]).
-          append("\n");
-      }
-      return sb.toString();
-    }
-
-    public long heapSize() {
-      long heapsize = ClassSize.align(ClassSize.OBJECT +
-          2 * Bytes.SIZEOF_INT + (3 + 1) * ClassSize.REFERENCE);
-      //Calculating the size of blockKeys
-      if(blockKeys != null) {
-        //Adding array + references overhead
-        heapsize += ClassSize.align(ClassSize.ARRAY +
-            blockKeys.length * ClassSize.REFERENCE);
-        //Adding bytes
-        for(byte [] bs : blockKeys) {
-          heapsize += ClassSize.align(ClassSize.ARRAY + bs.length);
-        }
-      }
-      if(blockOffsets != null) {
-        heapsize += ClassSize.align(ClassSize.ARRAY +
-            blockOffsets.length * Bytes.SIZEOF_LONG);
-      }
-      if(blockDataSizes != null) {
-        heapsize += ClassSize.align(ClassSize.ARRAY +
-            blockDataSizes.length * Bytes.SIZEOF_INT);
-      }
-
-      return ClassSize.align(heapsize);
-    }
+  public static Reader createReader(
+      FileSystem fs, Path path, BlockCache blockCache, boolean inMemory,
+      boolean evictOnClose) throws IOException {
+    return pickReaderVersion(path, fs.open(path),
+        fs.getFileStatus(path).getLen(), true, blockCache, inMemory,
+        evictOnClose);
+  }
 
+  public static Reader createReader(Path path, FSDataInputStream fsdis,
+      long size, BlockCache blockache, boolean inMemory, boolean evictOnClose)
+      throws IOException {
+    return pickReaderVersion(path, fsdis, size, false, blockache, inMemory,
+        evictOnClose);
   }
 
   /*
@@ -1980,23 +396,37 @@ public class HFile {
     static final byte [] COMPARATOR =
       Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
 
-    /*
-     * Constructor.
+    /**
+     * Append the given key/value pair to the file info, optionally checking the
+     * key prefix.
+     *
+     * @param k key to add
+     * @param v value to add
+     * @param checkPrefix whether to check that the provided key does not start
+     *          with the reserved prefix
+     * @return this file info object
+     * @throws IOException if the key or value is invalid
      */
-    FileInfo() {
-      super();
+    public FileInfo append(final byte[] k, final byte[] v,
+        final boolean checkPrefix) throws IOException {
+      if (k == null || v == null) {
+        throw new NullPointerException("Key nor value may be null");
+      }
+      if (checkPrefix && Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) {
+        throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
+            + " are reserved");
+      }
+      put(k, v);
+      return this;
     }
+
   }
 
-  /**
-   * Return true if the given file info key is reserved for internal
-   * use by HFile.
-   */
+  /** Return true if the given file info key is reserved for internal use. */
   public static boolean isReservedFileInfoKey(byte[] key) {
     return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
   }
 
-
   /**
    * Get names of supported compression algorithms. The names are acceptable by
    * HFile.Writer.
@@ -2034,7 +464,7 @@ public class HFile {
    * @throws IOException When scanning the files fails.
    */
   static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
-  throws IOException {
+      throws IOException {
     List<Path> res = new ArrayList<Path>();
     PathFilter dirFilter = new FSUtils.DirFilter(fs);
     FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
@@ -2049,173 +479,31 @@ public class HFile {
     return res;
   }
 
-  public static void main(String []args) throws IOException {
-    try {
-      // create options
-      Options options = new Options();
-      options.addOption("v", "verbose", false, "Verbose output; emits file and meta data delimiters");
-      options.addOption("p", "printkv", false, "Print key/value pairs");
-      options.addOption("e", "printkey", false, "Print keys");
-      options.addOption("m", "printmeta", false, "Print meta data of file");
-      options.addOption("b", "printblocks", false, "Print block index meta data");
-      options.addOption("k", "checkrow", false,
-        "Enable row order check; looks for out-of-order keys");
-      options.addOption("a", "checkfamily", false, "Enable family check");
-      options.addOption("f", "file", true,
-        "File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
-      options.addOption("r", "region", true,
-        "Region to scan. Pass region name; e.g. '.META.,,1'");
-      if (args.length == 0) {
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp("HFile ", options, true);
-        System.exit(-1);
-      }
-      CommandLineParser parser = new PosixParser();
-      CommandLine cmd = parser.parse(options, args);
-      boolean verbose = cmd.hasOption("v");
-      boolean printValue = cmd.hasOption("p");
-      boolean printKey = cmd.hasOption("e") || printValue;
-      boolean printMeta = cmd.hasOption("m");
-      boolean printBlocks = cmd.hasOption("b");
-      boolean checkRow = cmd.hasOption("k");
-      boolean checkFamily = cmd.hasOption("a");
-      // get configuration, file system and get list of files
-      Configuration conf = HBaseConfiguration.create();
-      conf.set("fs.defaultFS",
-        conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
-      ArrayList<Path> files = new ArrayList<Path>();
-      if (cmd.hasOption("f")) {
-        files.add(new Path(cmd.getOptionValue("f")));
-      }
-      if (cmd.hasOption("r")) {
-        String regionName = cmd.getOptionValue("r");
-        byte[] rn = Bytes.toBytes(regionName);
-        byte[][] hri = HRegionInfo.parseRegionName(rn);
-        Path rootDir = FSUtils.getRootDir(conf);
-        Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
-        String enc = HRegionInfo.encodeRegionName(rn);
-        Path regionDir = new Path(tableDir, enc);
-        if (verbose) System.out.println("region dir -> " + regionDir);
-        List<Path> regionFiles =
-          getStoreFiles(FileSystem.get(conf), regionDir);
-        if (verbose) System.out.println("Number of region files found -> " +
-          regionFiles.size());
-        if (verbose) {
-          int i = 1;
-          for (Path p : regionFiles) {
-            if (verbose) System.out.println("Found file[" + i++ + "] -> " + p);
-          }
-        }
-        files.addAll(regionFiles);
-      }
-
-      // iterate over all files found
-      for (Path file : files) {
-        if (verbose) System.out.println("Scanning -> " + file);
-        FileSystem fs = file.getFileSystem(conf);
-        if (!fs.exists(file)) {
-          System.err.println("ERROR, file doesnt exist: " + file);
-          continue;
-        }
-        // create reader and load file info
-        HFile.Reader reader = new HFile.Reader(fs, file, null, false, false);
-
-        Map<byte[],byte[]> fileInfo = reader.loadFileInfo();
-        int count = 0;
-        if (printKey || checkRow || checkFamily) {
-          // scan over file and read key/value's and check if requested
-          HFileScanner scanner = reader.getScanner(false, false, false);
-          scanner.seekTo();
-          KeyValue pkv = null;
-          do {
-            KeyValue kv = scanner.getKeyValue();
-            // dump key value
-            if (printKey) {
-              System.out.print("K: " + kv);
-              if (printValue) {
-                System.out.print(" V: " + Bytes.toStringBinary(kv.getValue()));
-              }
-              System.out.println();
-            }
-            // check if rows are in order
-            if (checkRow && pkv != null) {
-              if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
-                System.err.println("WARNING, previous row is greater then" +
-                    " current row\n\tfilename -> " + file +
-                    "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
-                    "\n\tcurrent  -> " + Bytes.toStringBinary(kv.getKey()));
-              }
-            }
-            // check if families are consistent
-            if (checkFamily) {
-              String fam = Bytes.toString(kv.getFamily());
-              if (!file.toString().contains(fam)) {
-                System.err.println("WARNING, filename does not match kv family," +
-                    "\n\tfilename -> " + file +
-                    "\n\tkeyvalue -> " + Bytes.toStringBinary(kv.getKey()));
-              }
-              if (pkv != null && Bytes.compareTo(pkv.getFamily(), kv.getFamily()) != 0) {
-                System.err.println("WARNING, previous kv has different family" +
-                    " compared to current key\n\tfilename -> " + file +
-                    "\n\tprevious -> " +  Bytes.toStringBinary(pkv.getKey()) +
-                    "\n\tcurrent  -> " + Bytes.toStringBinary(kv.getKey()));
-              }
-            }
-            pkv = kv;
-            count++;
-          } while (scanner.next());
-        }
+  public static void main(String[] args) throws IOException {
+    HFilePrettyPrinter prettyPrinter = new HFilePrettyPrinter();
+    System.exit(prettyPrinter.run(args));
+  }
 
-        if (verbose || printKey) {
-          System.out.println("Scanned kv count -> " + count);
-        }
+  public static String getBlockCacheKey(String hfileName, long offset) {
+    return hfileName + CACHE_KEY_SEPARATOR + offset;
+  }
 
-        // print meta data
-        if (printMeta) {
-          System.out.println("Block index size as per heapsize: " + reader.indexSize());
-          System.out.println(reader.toString());
-          System.out.println(reader.getTrailerInfo());
-          System.out.println("Fileinfo:");
-          for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
-            System.out.print(Bytes.toString(e.getKey()) + " = " );
-            if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY"))==0) {
-              long seqid = Bytes.toLong(e.getValue());
-              System.out.println(seqid);
-            } else if (Bytes.compareTo(e.getKey(),
-                Bytes.toBytes("TIMERANGE")) == 0) {
-              TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
-              Writables.copyWritable(e.getValue(), timeRangeTracker);
-              System.out.println(timeRangeTracker.getMinimumTimestamp() +
-                  "...." + timeRangeTracker.getMaximumTimestamp());
-            } else if (Bytes.compareTo(e.getKey(), FileInfo.AVG_KEY_LEN) == 0 ||
-                Bytes.compareTo(e.getKey(), FileInfo.AVG_VALUE_LEN) == 0) {
-              System.out.println(Bytes.toInt(e.getValue()));
-            } else {
-              System.out.println(Bytes.toStringBinary(e.getValue()));
-            }
-          }
-
-          //Printing bloom information
-          ByteBuffer b = reader.getMetaBlock("BLOOM_FILTER_META", false);
-          if (b!= null) {
-            BloomFilter bloomFilter = new ByteBloomFilter(b);
-            System.out.println("BloomSize: " + bloomFilter.getByteSize());
-            System.out.println("No of Keys in bloom: " +
-                bloomFilter.getKeyCount());
-            System.out.println("Max Keys for bloom: " +
-                bloomFilter.getMaxKeys());
-          } else {
-            System.out.println("Could not get bloom data from meta block");
-          }
-        }
-        if (printBlocks) {
-          System.out.println("Block Index:");
-          System.out.println(reader.blockIndex);
-        }
-        reader.close();
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
+  /**
+   * Checks the given {@link HFile} format version, and throws an exception if
+   * invalid. Note that if the version number comes from an input file and has
+   * not been verified, the caller needs to re-throw an {@link IOException} to
+   * indicate that this is not a software error, but corrupted input.
+   *
+   * @param version an HFile version
+   * @throws IllegalArgumentException if the version is invalid
+   */
+  public static void checkFormatVersion(int version)
+      throws IllegalArgumentException {
+    if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
+      throw new IllegalArgumentException("Invalid HFile version: " + version
+          + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and "
+          + MAX_FORMAT_VERSION + ")");
     }
   }
+
 }