You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/08/03 22:25:30 UTC

svn commit: r1153645 [1/3] - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase: io/ io/hfile/ util/

Author: tedyu
Date: Wed Aug  3 20:25:28 2011
New Revision: 1153645

URL: http://svn.apache.org/viewvc?rev=1153645&view=rev
Log:
HBASE-3857 more new files

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DoubleOutputStream.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterBase.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterBase.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterWriter.java

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DoubleOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DoubleOutputStream.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DoubleOutputStream.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DoubleOutputStream.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An output stream that writes to two streams on each operation. Does not
+ * attempt to handle exceptions gracefully. If any operation other than
+ * {@link #close()} fails on the first stream, it is not called on the second
+ * stream.
+ */
+public class DoubleOutputStream extends OutputStream {
+  private OutputStream out1;
+  private OutputStream out2;
+
+  public DoubleOutputStream(OutputStream out1, OutputStream out2) {
+    this.out1 = out1;
+    this.out2 = out2;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    out1.write(b);
+    out2.write(b);
+  }
+
+  @Override
+  public void write(byte b[]) throws IOException {
+    out1.write(b, 0, b.length);
+    out2.write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    out1.write(b, off, len);
+    out2.write(b, off, len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    out1.flush();
+    out2.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      out1.close();
+    } finally {
+      // Make sure we at least attempt to close both streams.
+      out2.close();
+    }
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Common functionality needed by all versions of {@link HFile} readers.
+ */
+public abstract class AbstractHFileReader implements HFile.Reader {
+
+  private static final Log LOG = LogFactory.getLog(AbstractHFileReader.class);
+
+  /** Filesystem-level block reader for this HFile format version. */
+  protected HFileBlock.FSReader fsBlockReader;
+
+  /** Stream to read from. */
+  protected FSDataInputStream istream;
+
+  /**
+   * True if we should close the input stream when done. We don't close it if we
+   * didn't open it.
+   */
+  protected final boolean closeIStream;
+
+  /** Data block index reader keeping the root data index in memory */
+  protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
+
+  /** Meta block index reader -- always single level */
+  protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
+
+  protected final FixedFileTrailer trailer;
+
+  /** Filled when we read in the trailer. */
+  protected final Compression.Algorithm compressAlgo;
+
+  /** Last key in the file. Filled in when we read in the file info */
+  protected byte [] lastKey = null;
+
+  /** Average key length read from file info */
+  protected int avgKeyLen = -1;
+
+  /** Average value length read from file info */
+  protected int avgValueLen = -1;
+
+  /** Key comparator */
+  protected RawComparator<byte []> comparator;
+
+  /** Size of this file. */
+  protected final long fileSize;
+
+  /** Block cache to use. */
+  protected final BlockCache blockCache;
+
+  protected AtomicLong cacheHits = new AtomicLong();
+  protected AtomicLong blockLoads = new AtomicLong();
+  protected AtomicLong metaLoads = new AtomicLong();
+
+  /**
+   * Whether file is from in-memory store (comes from column family
+   * configuration).
+   */
+  protected boolean inMemory = false;
+
+  /**
+   * Whether blocks of file should be evicted from the block cache when the
+   * file is being closed
+   */
+  protected final boolean evictOnClose;
+
+  /** Path of file */
+  protected final Path path;
+
+  /** File name to be used for block names */
+  protected final String name;
+
+  protected FileInfo fileInfo;
+
+  /** Prefix of the form cf.<column_family_name> for statistics counters. */
+  private final String cfStatsPrefix;
+
+  protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
+      final FSDataInputStream fsdis, final long fileSize,
+      final boolean closeIStream,
+      final BlockCache blockCache, final boolean inMemory,
+      final boolean evictOnClose) {
+    this.trailer = trailer;
+    this.compressAlgo = trailer.getCompressionCodec();
+    this.blockCache = blockCache;
+    this.fileSize = fileSize;
+    this.istream = fsdis;
+    this.closeIStream = closeIStream;
+    this.inMemory = inMemory;
+    this.evictOnClose = evictOnClose;
+    this.path = path;
+    this.name = path.getName();
+    cfStatsPrefix = "cf." + parseCfNameFromPath(path.toString());
+  }
+
+  @SuppressWarnings("serial")
+  public static class BlockIndexNotLoadedException
+      extends IllegalStateException {
+    public BlockIndexNotLoadedException() {
+      // Add a message in case anyone relies on it as opposed to class name.
+      super("Block index not loaded");
+    }
+  }
+
+  protected String toStringFirstKey() {
+    return KeyValue.keyToString(getFirstKey());
+  }
+
+  protected String toStringLastKey() {
+    return KeyValue.keyToString(getLastKey());
+  }
+
+  /**
+   * Parse the HFile path to figure out which table and column family
+   * it belongs to. This is used to maintain read statistics on a
+   * per-column-family basis.
+   *
+   * @param path HFile path name
+   */
+  public static String parseCfNameFromPath(String path) {
+    String splits[] = path.split("/");
+    if (splits.length < 2) {
+      LOG.warn("Could not determine the table and column family of the " +
+          "HFile path " + path);
+      return "unknown";
+    }
+
+    return splits[splits.length - 2];
+  }
+
+  public abstract boolean isFileInfoLoaded();
+
+  @Override
+  public String toString() {
+    return "reader=" + path.toString() +
+        (!isFileInfoLoaded()? "":
+          ", compression=" + compressAlgo.getName() +
+          ", inMemory=" + inMemory +
+          ", firstKey=" + toStringFirstKey() +
+          ", lastKey=" + toStringLastKey()) +
+          ", avgKeyLen=" + avgKeyLen +
+          ", avgValueLen=" + avgValueLen +
+          ", entries=" + trailer.getEntryCount() +
+          ", length=" + fileSize;
+  }
+
+  @Override
+  public long length() {
+    return fileSize;
+  }
+
+  /**
+   * Create a Scanner on this file. No seeks or reads are done on creation. Call
+   * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+   * nothing to clean up in a Scanner. Letting go of your references to the
+   * scanner is sufficient. NOTE: Do not use this overload of getScanner for
+   * compactions.
+   *
+   * @param cacheBlocks True if we should cache blocks read in by this scanner.
+   * @param pread Use positional read rather than seek+read if true (pread is
+   *          better for random reads, seek+read is better scanning).
+   * @return Scanner on this file.
+   */
+  @Override
+  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
+    return getScanner(cacheBlocks, pread, false);
+  }
+
+  /**
+   * @return the first key in the file. May be null if file has no entries. Note
+   *         that this is not the first row key, but rather the byte form of the
+   *         first KeyValue.
+   */
+  @Override
+  public byte [] getFirstKey() {
+    if (dataBlockIndexReader == null) {
+      throw new BlockIndexNotLoadedException();
+    }
+    return dataBlockIndexReader.isEmpty() ? null
+        : dataBlockIndexReader.getRootBlockKey(0);
+  }
+
+  /**
+   * TODO left from {@HFile} version 1: move this to StoreFile after Ryan's
+   * patch goes in to eliminate {@link KeyValue} here.
+   *
+   * @return the first row key, or null if the file is empty.
+   */
+  @Override
+  public byte[] getFirstRowKey() {
+    byte[] firstKey = getFirstKey();
+    if (firstKey == null)
+      return null;
+    return KeyValue.createKeyValueFromKey(firstKey).getRow();
+  }
+
+  /**
+   * TODO left from {@HFile} version 1: move this to StoreFile after
+   * Ryan's patch goes in to eliminate {@link KeyValue} here.
+   *
+   * @return the last row key, or null if the file is empty.
+   */
+  @Override
+  public byte[] getLastRowKey() {
+    byte[] lastKey = getLastKey();
+    if (lastKey == null)
+      return null;
+    return KeyValue.createKeyValueFromKey(lastKey).getRow();
+  }
+
+  /** @return number of KV entries in this HFile */
+  @Override
+  public long getEntries() {
+    return trailer.getEntryCount();
+  }
+
+  /** @return comparator */
+  @Override
+  public RawComparator<byte []> getComparator() {
+    return comparator;
+  }
+
+  /** @return compression algorithm */
+  @Override
+  public Compression.Algorithm getCompressionAlgorithm() {
+    return compressAlgo;
+  }
+
+  /**
+   * @return the total heap size of data and meta block indexes in bytes. Does
+   *         not take into account non-root blocks of a multilevel data index.
+   */
+  public long indexSize() {
+    return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
+        + ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
+            : 0);
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
+    return dataBlockIndexReader;
+  }
+
+  @Override
+  public String getColumnFamilyName() {
+    return cfStatsPrefix;
+  }
+
+  @Override
+  public FixedFileTrailer getTrailer() {
+    return trailer;
+  }
+
+  @Override
+  public FileInfo loadFileInfo() throws IOException {
+    return fileInfo;
+  }
+
+  /**
+   * An exception thrown when an operation requiring a scanner to be seeked
+   * is invoked on a scanner that is not seeked.
+   */
+  @SuppressWarnings("serial")
+  public static class NotSeekedException extends IllegalStateException {
+    public NotSeekedException() {
+      super("Not seeked to a key/value");
+    }
+  }
+
+  protected static abstract class Scanner implements HFileScanner {
+    protected HFile.Reader reader;
+    protected ByteBuffer blockBuffer;
+
+    protected boolean cacheBlocks;
+    protected final boolean pread;
+    protected final boolean isCompaction;
+
+    protected int currKeyLen;
+    protected int currValueLen;
+
+    protected int blockFetches;
+
+    public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      this.reader = reader;
+      this.cacheBlocks = cacheBlocks;
+      this.pread = pread;
+      this.isCompaction = isCompaction;
+    }
+
+    @Override
+    public Reader getReader() {
+      return reader;
+    }
+
+    @Override
+    public boolean isSeeked(){
+      return blockBuffer != null;
+    }
+
+    @Override
+    public String toString() {
+      return "HFileScanner for reader " + String.valueOf(reader);
+    }
+
+    protected void assertSeeked() {
+      if (!isSeeked())
+        throw new NotSeekedException();
+    }
+  }
+
+  /** For testing */
+  HFileBlock.FSReader getUncachedBlockReader() {
+    return fsBlockReader;
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Common functionality needed by all versions of {@link HFile} writers.
+ */
+public abstract class AbstractHFileWriter implements HFile.Writer {
+
+  /** Key previously appended. Becomes the last key in the file. */
+  protected byte[] lastKeyBuffer = null;
+
+  protected int lastKeyOffset = -1;
+  protected int lastKeyLength = -1;
+
+  /** FileSystem stream to write into. */
+  protected FSDataOutputStream outputStream;
+
+  /** True if we opened the <code>outputStream</code> (and so will close it). */
+  protected final boolean closeOutputStream;
+
+  /** A "file info" block: a key-value map of file-wide metadata. */
+  protected FileInfo fileInfo = new HFile.FileInfo();
+
+  /** Number of uncompressed bytes we allow per block. */
+  protected final int blockSize;
+
+  /** Total # of key/value entries, i.e. how many times add() was called. */
+  protected long entryCount = 0;
+
+  /** Used for calculating the average key length. */
+  protected long totalKeyLength = 0;
+
+  /** Used for calculating the average value length. */
+  protected long totalValueLength = 0;
+
+  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
+  protected long totalUncompressedBytes = 0;
+
+  /** Key comparator. Used to ensure we write in order. */
+  protected final RawComparator<byte[]> comparator;
+
+  /** Meta block names. */
+  protected List<byte[]> metaNames = new ArrayList<byte[]>();
+
+  /** {@link Writable}s representing meta block data. */
+  protected List<Writable> metaData = new ArrayList<Writable>();
+
+  /** The compression algorithm used. NONE if no compression. */
+  protected final Compression.Algorithm compressAlgo;
+
+  /** First key in a block. */
+  protected byte[] firstKeyInBlock = null;
+
+  /** May be null if we were passed a stream. */
+  protected final Path path;
+
+  /** Whether to cache key/value data blocks on write */
+  protected final boolean cacheDataBlocksOnWrite;
+
+  /** Whether to cache non-root index blocks on write */
+  protected final boolean cacheIndexBlocksOnWrite;
+
+  /** Block cache to optionally fill on write. */
+  protected BlockCache blockCache;
+
+  /** Configuration used for block cache initialization */
+  private Configuration conf;
+
+  /**
+   * Name for this object used when logging or in toString. Is either
+   * the result of a toString on stream or else toString of passed file Path.
+   */
+  protected final String name;
+
+  public AbstractHFileWriter(Configuration conf,
+      FSDataOutputStream outputStream, Path path, int blockSize,
+      Compression.Algorithm compressAlgo, KeyComparator comparator) {
+    this.outputStream = outputStream;
+    this.path = path;
+    this.name = path != null ? path.getName() : outputStream.toString();
+    this.blockSize = blockSize;
+    this.compressAlgo = compressAlgo == null
+        ? HFile.DEFAULT_COMPRESSION_ALGORITHM : compressAlgo;
+    this.comparator = comparator != null ? comparator
+        : Bytes.BYTES_RAWCOMPARATOR;
+
+    closeOutputStream = path != null;
+
+    cacheDataBlocksOnWrite = conf.getBoolean(HFile.CACHE_BLOCKS_ON_WRITE_KEY,
+        false);
+    cacheIndexBlocksOnWrite = HFileBlockIndex.shouldCacheOnWrite(conf);
+
+    this.conf = conf;
+
+    if (cacheDataBlocksOnWrite || cacheIndexBlocksOnWrite)
+      initBlockCache();
+  }
+
+  /**
+   * Add last bits of metadata to file info before it is written out.
+   */
+  protected void finishFileInfo() throws IOException {
+    if (lastKeyBuffer != null) {
+      // Make a copy. The copy is stuffed into HMapWritable. Needs a clean
+      // byte buffer. Won't take a tuple.
+      fileInfo.append(FileInfo.LASTKEY, Arrays.copyOfRange(lastKeyBuffer,
+          lastKeyOffset, lastKeyOffset + lastKeyLength), false);
+    }
+
+    // Average key length.
+    int avgKeyLen =
+        entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
+    fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
+
+    // Average value length.
+    int avgValueLen =
+        entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
+    fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
+  }
+
+  /**
+   * Add to the file info. All added key/value pairs can be obtained using
+   * {@link HFile.Reader#loadFileInfo()}.
+   *
+   * @param k Key
+   * @param v Value
+   * @throws IOException in case the key or the value are invalid
+   */
+  @Override
+  public void appendFileInfo(final byte[] k, final byte[] v)
+      throws IOException {
+    fileInfo.append(k, v, true);
+  }
+
+  /**
+   * Sets the file info offset in the trailer, finishes up populating fields in
+   * the file info, and writes the file info into the given data output. The
+   * reason the data output is not always {@link #outputStream} is that we store
+   * file info as a block in version 2.
+   *
+   * @param trailer fixed file trailer
+   * @param out the data output to write the file info to
+   * @throws IOException
+   */
+  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutput out)
+      throws IOException {
+    trailer.setFileInfoOffset(outputStream.getPos());
+    finishFileInfo();
+    fileInfo.write(out);
+  }
+
+  /**
+   * Checks that the given key does not violate the key order.
+   *
+   * @param key Key to check.
+   * @return true if the key is duplicate
+   * @throws IOException if the key or the key order is wrong
+   */
+  protected boolean checkKey(final byte[] key, final int offset,
+      final int length) throws IOException {
+    boolean isDuplicateKey = false;
+
+    if (key == null || length <= 0) {
+      throw new IOException("Key cannot be null or empty");
+    }
+    if (length > HFile.MAXIMUM_KEY_LENGTH) {
+      throw new IOException("Key length " + length + " > "
+          + HFile.MAXIMUM_KEY_LENGTH);
+    }
+    if (lastKeyBuffer != null) {
+      int keyComp = comparator.compare(lastKeyBuffer, lastKeyOffset,
+          lastKeyLength, key, offset, length);
+      if (keyComp > 0) {
+        throw new IOException("Added a key not lexically larger than"
+            + " previous key="
+            + Bytes.toStringBinary(key, offset, length)
+            + ", lastkey="
+            + Bytes.toStringBinary(lastKeyBuffer, lastKeyOffset,
+                lastKeyLength));
+      } else if (keyComp == 0) {
+        isDuplicateKey = true;
+      }
+    }
+    return isDuplicateKey;
+  }
+
+  /** Checks the given value for validity. */
+  protected void checkValue(final byte[] value, final int offset,
+      final int length) throws IOException {
+    if (value == null) {
+      throw new IOException("Value cannot be null");
+    }
+  }
+
+  /**
+   * @return Path or null if we were passed a stream rather than a Path.
+   */
+  @Override
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public String toString() {
+    return "writer=" + (path != null ? path.toString() : null) + ", name="
+        + name + ", compression=" + compressAlgo.getName();
+  }
+
+  /**
+   * Sets remaining trailer fields, writes the trailer to disk, and optionally
+   * closes the output stream.
+   */
+  protected void finishClose(FixedFileTrailer trailer) throws IOException {
+    trailer.setMetaIndexCount(metaNames.size());
+    trailer.setTotalUncompressedBytes(totalUncompressedBytes);
+    trailer.setEntryCount(entryCount);
+    trailer.setCompressionCodec(compressAlgo);
+
+    trailer.serialize(outputStream);
+
+    if (closeOutputStream) {
+      outputStream.close();
+      outputStream = null;
+    }
+  }
+
+  public static Compression.Algorithm compressionByName(String algoName) {
+    if (algoName == null)
+      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
+    return Compression.getCompressionAlgorithmByName(algoName);
+  }
+
+  /** A helper method to create HFile output streams in constructors */
+  protected static FSDataOutputStream createOutputStream(Configuration conf,
+      FileSystem fs, Path path) throws IOException {
+    return fs.create(path, FsPermission.getDefault(), true,
+        fs.getConf().getInt("io.file.buffer.size", 4096),
+        fs.getDefaultReplication(), fs.getDefaultBlockSize(),
+        null);
+  }
+
+  /** Initializes the block cache to use for cache-on-write */
+  protected void initBlockCache() {
+    if (blockCache == null) {
+      blockCache = StoreFile.getBlockCache(conf);
+      conf = null;  // This is all we need configuration for.
+    }
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Various types of {@link HFile} blocks. Ordinal values of these enum constants
+ * must not be relied upon. The values in the enum appear in the order they
+ * appear in a version 2 {@link HFile}.
+ */
+public enum BlockType {
+
+  // Scanned block section
+
+  /** Data block, both versions */
+  DATA("DATABLK*"),
+
+  /** Version 2 leaf index block. Appears in the data block section */
+  LEAF_INDEX("IDXLEAF2"),
+
+  /** Bloom filter block, version 2 */
+  BLOOM_CHUNK("BLMFBLK2"),
+
+  // Non-scanned block section
+
+  /** Meta blocks */
+  META("METABLKc"),
+
+  /** Intermediate-level version 2 index in the non-data block section */
+  INTERMEDIATE_INDEX("IDXINTE2"),
+
+  // Load-on-open section.
+
+  /** Root index block, also used for the single-level meta index, version 2 */
+  ROOT_INDEX("IDXROOT2"),
+
+  /** File info, version 2 */
+  FILE_INFO("FILEINF2"),
+
+  /** Bloom filter metadata, version 2 */
+  BLOOM_META("BLMFMET2"),
+
+  // Trailer
+
+  /** Fixed file trailer, both versions (always just a magic string) */
+  TRAILER("TRABLK\"$"),
+
+  // Legacy blocks
+
+  /** Block index magic string in version 1 */
+  INDEX_V1("IDXBLK)+");
+
+  public static final int MAGIC_LENGTH = 8;
+
+  private final byte[] magic;
+
+  private BlockType(String magicStr) {
+    magic = Bytes.toBytes(magicStr);
+    assert magic.length == MAGIC_LENGTH;
+  }
+
+  public void writeToStream(OutputStream out) throws IOException {
+    out.write(magic);
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.write(magic);
+  }
+
+  public void write(ByteBuffer buf) {
+    buf.put(magic);
+  }
+
+  public static BlockType parse(byte[] buf, int offset, int length)
+      throws IOException {
+    if (length != MAGIC_LENGTH) {
+      throw new IOException("Magic record of invalid length: "
+          + Bytes.toStringBinary(buf, offset, length));
+    }
+
+    for (BlockType blockType : values())
+      if (Bytes.compareTo(blockType.magic, 0, MAGIC_LENGTH, buf, offset,
+          MAGIC_LENGTH) == 0)
+        return blockType;
+
+    throw new IOException("Invalid HFile block magic: "
+        + Bytes.toStringBinary(buf, offset, MAGIC_LENGTH));
+  }
+
+  public static BlockType read(DataInputStream in) throws IOException {
+    byte[] buf = new byte[MAGIC_LENGTH];
+    in.readFully(buf);
+    return parse(buf, 0, buf.length);
+  }
+
+  public static BlockType read(ByteBuffer buf) throws IOException {
+    BlockType blockType = parse(buf.array(),
+        buf.arrayOffset() + buf.position(),
+        Math.min(buf.limit() - buf.position(), MAGIC_LENGTH));
+
+    // If we got here, we have read exactly MAGIC_LENGTH bytes.
+    buf.position(buf.position() + MAGIC_LENGTH);
+    return blockType;
+  }
+
+  /**
+   * Put the magic record out to the specified byte array position.
+   *
+   * @param bytes the byte array
+   * @param offset position in the array
+   * @return incremented offset
+   */
+  public int put(byte[] bytes, int offset) {
+    System.arraycopy(magic, 0, bytes, offset, MAGIC_LENGTH);
+    return offset + MAGIC_LENGTH;
+  }
+
+  /**
+   * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
+   * stream and expects it to match this block type.
+   */
+  public void readAndCheck(DataInputStream in) throws IOException {
+    byte[] buf = new byte[MAGIC_LENGTH];
+    in.readFully(buf);
+    if (Bytes.compareTo(buf, magic) != 0) {
+      throw new IOException("Invalid magic: expected "
+          + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
+    }
+  }
+
+  /**
+   * Reads a magic record of the length {@link #MAGIC_LENGTH} from the given
+   * byte buffer and expects it to match this block type.
+   */
+  public void readAndCheck(ByteBuffer in) throws IOException {
+    byte[] buf = new byte[MAGIC_LENGTH];
+    in.get(buf);
+    if (Bytes.compareTo(buf, magic) != 0) {
+      throw new IOException("Invalid magic: expected "
+          + Bytes.toStringBinary(magic) + ", got " + Bytes.toStringBinary(buf));
+    }
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1153645&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Wed Aug  3 20:25:28 2011
@@ -0,0 +1,1441 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.hbase.io.DoubleOutputStream;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CompoundBloomFilter;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
+
+/**
+ * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
+ * <ul>
+ * <li>In version 1 all blocks are always compressed or uncompressed, as
+ * specified by the {@link HFile}'s compression algorithm, with a type-specific
+ * magic record stored in the beginning of the compressed data (i.e. one needs
+ * to uncompress the compressed block to determine the block type). There is
+ * only a single compression algorithm setting for all blocks. Offset and size
+ * information from the block index are required to read a block.
+ * <li>In version 2 a block is structured as follows:
+ * <ul>
+ * <li>Magic record identifying the block type (8 bytes)
+ * <li>Compressed block size, header not included (4 bytes)
+ * <li>Uncompressed block size, header not included (4 bytes)
+ * <li>The offset of the previous block of the same type (8 bytes). This is
+ * used to be able to navigate to the previous block without going to the block
+ * index.
+ * <li>Compressed data (or uncompressed data if compression is disabled). The
+ * compression algorithm is the same for all the blocks in the {@link HFile},
+ * similarly to what was done in version 1.
+ * </ul>
+ * </ul>
+ * The version 2 block representation in the block cache is the same as above,
+ * except that the data section is always uncompressed in the cache.
+ */
+public class HFileBlock implements HeapSize {
+
+  /** The size of a version 2 {@link HFile} block header */
+  public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
+      + Bytes.SIZEOF_LONG;
+
+  /** Just an array of bytes of the right size. */
+  public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
+
+  public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
+      ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
+
+  private BlockType blockType;
+  private final int onDiskSizeWithoutHeader;
+  private final int uncompressedSizeWithoutHeader;
+  private final long prevBlockOffset;
+  private ByteBuffer buf;
+
+  /**
+   * The offset of this block in the file. Populated by the reader for
+   * convenience of access. This offset is not part of the block header.
+   */
+  private long offset = -1;
+
+  /**
+   * The on-disk size of the next block, including the header, obtained by
+   * peeking into the first {@link HEADER_SIZE} bytes of the next block's
+   * header, or -1 if unknown.
+   */
+  private int nextBlockOnDiskSizeWithHeader = -1;
+
+  /**
+   * Creates a new {@link HFile} block from the given fields. This constructor
+   * is mostly used when the block data has already been read and uncompressed,
+   * and is sitting in a byte buffer.
+   *
+   * @param blockType the type of this block, see {@link BlockType}
+   * @param onDiskSizeWithoutHeader compressed size of the block if compression
+   *          is used, otherwise uncompressed size, header size not included
+   * @param uncompressedSizeWithoutHeader uncompressed size of the block,
+   *          header size not included. Equals onDiskSizeWithoutHeader if
+   *          compression is disabled.
+   * @param prevBlockOffset the offset of the previous block in the
+   *          {@link HFile}
+   * @param buf block header ({@link #HEADER_SIZE} bytes) followed by
+   *          uncompressed data. This
+   * @param fillHeader true to fill in the first {@link #HEADER_SIZE} bytes of
+   *          the buffer based on the header fields provided
+   * @param offset the file offset the block was read from
+   */
+  public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
+      int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
+      boolean fillHeader, long offset) {
+    this.blockType = blockType;
+    this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
+    this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
+    this.prevBlockOffset = prevBlockOffset;
+    this.buf = buf;
+    if (fillHeader)
+      overwriteHeader();
+    this.offset = offset;
+  }
+
+  /**
+   * Creates a block from an existing buffer starting with a header. Rewinds
+   * and takes ownership of the buffer. By definition of rewind, ignores the
+   * buffer position, but if you slice the buffer beforehand, it will rewind
+   * to that point.
+   */
+  private HFileBlock(ByteBuffer b) throws IOException {
+    b.rewind();
+    blockType = BlockType.read(b);
+    onDiskSizeWithoutHeader = b.getInt();
+    uncompressedSizeWithoutHeader = b.getInt();
+    prevBlockOffset = b.getLong();
+    buf = b;
+    buf.rewind();
+  }
+
+  public BlockType getBlockType() {
+    return blockType;
+  }
+
+  /**
+   * @return the on-disk size of the block with header size included
+   */
+  public int getOnDiskSizeWithHeader() {
+    return onDiskSizeWithoutHeader + HEADER_SIZE;
+  }
+
+  /**
+   * Returns the size of the compressed part of the block in case compression
+   * is used, or the uncompressed size of the data part otherwise. Header size
+   * is not included.
+   *
+   * @return the on-disk size of the data part of the block, header not
+   *         included
+   */
+  public int getOnDiskSizeWithoutHeader() {
+    return onDiskSizeWithoutHeader;
+  }
+
+  /**
+   * @return the uncompressed size of the data part of the block, header not
+   *         included
+   */
+  public int getUncompressedSizeWithoutHeader() {
+    return uncompressedSizeWithoutHeader;
+  }
+
+  /**
+   * @return the offset of the previous block of the same type in the file, or
+   *         -1 if unknown
+   */
+  public long getPrevBlockOffset() {
+    return prevBlockOffset;
+  }
+
+  /**
+   * Writes header fields into the first {@link HEADER_SIZE} bytes of the
+   * buffer. Resets the buffer position to the end of header as side effect.
+   */
+  private void overwriteHeader() {
+    buf.rewind();
+    blockType.write(buf);
+    buf.putInt(onDiskSizeWithoutHeader);
+    buf.putInt(uncompressedSizeWithoutHeader);
+    buf.putLong(prevBlockOffset);
+  }
+
+  /**
+   * Returns a buffer that does not include the header. The array offset points
+   * to the start of the block data right after the header. The underlying data
+   * array is not copied.
+   *
+   * @return the buffer with header skipped
+   */
+  public ByteBuffer getBufferWithoutHeader() {
+    return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + HEADER_SIZE,
+        buf.limit() - HEADER_SIZE).slice();
+  }
+
+  /**
+   * Returns the buffer this block stores internally. The clients must not
+   * modify the buffer object. This method has to be public because it is
+   * used in {@link CompoundBloomFilter} to avoid object creation on every
+   * Bloom filter lookup, but has to be used with caution.
+   *
+   * @return the buffer of this block for read-only operations
+   */
+  public ByteBuffer getBufferReadOnly() {
+    return buf;
+  }
+
+  /**
+   * Returns a byte buffer of this block, including header data, positioned at
+   * the beginning of header. The underlying data array is not copied.
+   *
+   * @return the byte buffer with header included
+   */
+  public ByteBuffer getBufferWithHeader() {
+    ByteBuffer dupBuf = buf.duplicate();
+    dupBuf.rewind();
+    return dupBuf;
+  }
+
+  /**
+   * Deserializes fields of the given writable using the data portion of this
+   * block. Does not check that all the block data has been read.
+   */
+  public void readInto(Writable w) throws IOException {
+    Preconditions.checkNotNull(w);
+
+    if (Writables.getWritable(buf.array(), buf.arrayOffset() + HEADER_SIZE,
+        buf.limit() - HEADER_SIZE, w) == null) {
+      throw new IOException("Failed to deserialize block " + this + " into a "
+          + w.getClass().getSimpleName());
+    }
+  }
+
+  private void sanityCheckAssertion(long valueFromBuf, long valueFromField,
+      String fieldName) throws IOException {
+    if (valueFromBuf != valueFromField) {
+      throw new AssertionError(fieldName + " in the buffer (" + valueFromBuf
+          + ") is different from that in the field (" + valueFromField + ")");
+    }
+  }
+
+  /**
+   * Checks if the block is internally consistent, i.e. the first
+   * {@link #HEADER_SIZE} bytes of the buffer contain a valid header consistent
+   * with the fields. This function is primary for testing and debugging, and
+   * is not thread-safe, because it alters the internal buffer pointer.
+   */
+  void sanityCheck() throws IOException {
+    buf.rewind();
+
+    {
+      BlockType blockTypeFromBuf = BlockType.read(buf);
+      if (blockTypeFromBuf != blockType) {
+        throw new IOException("Block type stored in the buffer: " +
+            blockTypeFromBuf + ", block type field: " + blockType);
+      }
+    }
+
+    sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
+        "onDiskSizeWithoutHeader");
+
+    sanityCheckAssertion(buf.getInt(), uncompressedSizeWithoutHeader,
+        "uncompressedSizeWithoutHeader");
+
+    sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
+
+    int expectedBufLimit = uncompressedSizeWithoutHeader + HEADER_SIZE;
+    if (buf.limit() != expectedBufLimit) {
+      throw new AssertionError("Expected buffer limit " + expectedBufLimit
+          + ", got " + buf.limit());
+    }
+
+    // We might optionally allocate HEADER_SIZE more bytes to read the next
+    // block's, header, so there are two sensible values for buffer capacity.
+    if (buf.capacity() != uncompressedSizeWithoutHeader + HEADER_SIZE &&
+        buf.capacity() != uncompressedSizeWithoutHeader + 2 * HEADER_SIZE) {
+      throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
+          ", expected " + (uncompressedSizeWithoutHeader + HEADER_SIZE) +
+          " or " + (uncompressedSizeWithoutHeader + 2 * HEADER_SIZE));
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "blockType="
+        + blockType
+        + ", onDiskSizeWithoutHeader="
+        + onDiskSizeWithoutHeader
+        + ", uncompressedSizeWithoutHeader="
+        + uncompressedSizeWithoutHeader
+        + ", prevBlockOffset="
+        + prevBlockOffset
+        + ", dataBeginsWith="
+        + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + HEADER_SIZE,
+            Math.min(32, buf.limit() - buf.arrayOffset() - HEADER_SIZE))
+        + ", fileOffset=" + offset;
+  }
+
+  private void validateOnDiskSizeWithoutHeader(
+      int expectedOnDiskSizeWithoutHeader) throws IOException {
+    if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
+      String blockInfoMsg =
+        "Block offset: " + offset + ", data starts with: "
+          + Bytes.toStringBinary(buf.array(), buf.arrayOffset(),
+              buf.arrayOffset() + Math.min(32, buf.limit()));
+      throw new IOException("On-disk size without header provided is "
+          + expectedOnDiskSizeWithoutHeader + ", but block "
+          + "header contains " + onDiskSizeWithoutHeader + ". " +
+          blockInfoMsg);
+    }
+  }
+
+  /**
+   * Always allocates a new buffer of the correct size. Copies header bytes
+   * from the existing buffer. Does not change header fields.
+   *
+   * @param extraBytes whether to reserve room in the buffer to read the next
+   *          block's header
+   */
+  private void allocateBuffer(boolean extraBytes) {
+    int capacityNeeded = HEADER_SIZE + uncompressedSizeWithoutHeader +
+        (extraBytes ? HEADER_SIZE : 0);
+
+    ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
+
+    // Copy header bytes.
+    System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
+        newBuf.arrayOffset(), HEADER_SIZE);
+
+    buf = newBuf;
+    buf.limit(HEADER_SIZE + uncompressedSizeWithoutHeader);
+  }
+
+  /** An additional sanity-check in case no compression is being used. */
+  public void assumeUncompressed() throws IOException {
+    if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader) {
+      throw new IOException("Using no compression but "
+          + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
+          + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader);
+    }
+  }
+
+  /**
+   * @param expectedType the expected type of this block
+   * @throws IOException if this block's type is different than expected
+   */
+  public void expectType(BlockType expectedType) throws IOException {
+    if (blockType != expectedType) {
+      throw new IOException("Invalid block type: expected=" + expectedType
+          + ", actual=" + blockType);
+    }
+  }
+
+  /** @return the offset of this block in the file it was read from */
+  public long getOffset() {
+    if (offset < 0) {
+      throw new IllegalStateException(
+          "HFile block offset not initialized properly");
+    }
+    return offset;
+  }
+
+  /**
+   * @return a byte stream reading the data section of this block
+   */
+  public DataInputStream getByteStream() {
+    return new DataInputStream(new ByteArrayInputStream(buf.array(),
+        buf.arrayOffset() + HEADER_SIZE, buf.limit() - HEADER_SIZE));
+  }
+
+  @Override
+  public long heapSize() {
+    // This object, block type and byte buffer reference, on-disk and
+    // uncompressed size, next block's on-disk size, offset and previous
+    // offset, byte buffer object, and its byte array. Might also need to add
+    // some fields inside the byte buffer.
+    return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
+        * Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE) +
+        ClassSize.align(buf.capacity());
+  }
+
+  /**
+   * Read from an input stream. Analogous to
+   * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a
+   * number of "extra" bytes that would be desirable but not absolutely
+   * necessary to read.
+   *
+   * @param in the input stream to read from
+   * @param buf the buffer to read into
+   * @param bufOffset the destination offset in the buffer
+   * @param necessaryLen the number of bytes that are absolutely necessary to
+   *          read
+   * @param extraLen the number of extra bytes that would be nice to read
+   * @return true if succeeded reading the extra bytes
+   * @throws IOException if failed to read the necessary bytes
+   */
+  public static boolean readWithExtra(InputStream in, byte buf[],
+      int bufOffset, int necessaryLen, int extraLen) throws IOException {
+    int bytesRemaining = necessaryLen + extraLen;
+    while (bytesRemaining > 0) {
+      int ret = in.read(buf, bufOffset, bytesRemaining);
+      if (ret == -1 && bytesRemaining <= extraLen) {
+        // We could not read the "extra data", but that is OK.
+        break;
+      }
+
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream (read "
+            + "returned " + ret + ", was trying to read " + necessaryLen
+            + " necessary bytes and " + extraLen + " extra bytes, "
+            + "successfully read "
+            + (necessaryLen + extraLen - bytesRemaining));
+      }
+      bufOffset += ret;
+      bytesRemaining -= ret;
+    }
+    return bytesRemaining <= 0;
+  }
+
+  /**
+   * @return the on-disk size of the next block (including the header size)
+   *         that was read by peeking into the next block's header
+   */
+  public int getNextBlockOnDiskSizeWithHeader() {
+    return nextBlockOnDiskSizeWithHeader;
+  }
+
+
+  /**
+   * Unified version 2 {@link HFile} block writer. The intended usage pattern
+   * is as follows:
+   * <ul>
+   * <li>Construct an {@link HFileBlock.Writer}, providing a compression
+   * algorithm
+   * <li>Call {@link Writer#startWriting(BlockType)} and get a data stream to
+   * write to
+   * <li>Write your data into the stream
+   * <li>Call {@link Writer#writeHeaderAndData()} as many times as you need to
+   * store the serialized block into an external stream, or call
+   * {@link Writer#getHeaderAndData()} to get it as a byte array.
+   * <li>Repeat to write more blocks
+   * </ul>
+   * <p>
+   */
+  public static class Writer {
+
+    private enum State {
+      INIT,
+      WRITING,
+      BLOCK_READY
+    };
+
+    /** Writer state. Used to ensure the correct usage protocol. */
+    private State state = State.INIT;
+
+    /** Compression algorithm for all blocks this instance writes. */
+    private final Compression.Algorithm compressAlgo;
+
+    /**
+     * The stream we use to accumulate data in the on-disk format for each
+     * block (i.e. compressed data, or uncompressed if using no compression).
+     * We reset this stream at the end of each block and reuse it. The header
+     * is written as the first {@link #HEADER_SIZE} bytes into this stream.
+     */
+    private ByteArrayOutputStream baosOnDisk;
+
+    /**
+     * The stream we use to accumulate uncompressed block data for
+     * cache-on-write. Null when cache-on-write is turned off.
+     */
+    private ByteArrayOutputStream baosInMemory;
+
+    /** Compressor, which is also reused between consecutive blocks. */
+    private Compressor compressor;
+
+    /** Current block type. Set in {@link #startWriting(BlockType)}. */
+    private BlockType blockType;
+
+    /**
+     * A stream that we write uncompressed bytes to, which compresses them and
+     * writes them to {@link #baosOnDisk}.
+     */
+    private DataOutputStream userDataStream;
+
+    /**
+     * Bytes to be written to the file system, including the header. Compressed
+     * if compression is turned on.
+     */
+    private byte[] onDiskBytesWithHeader;
+
+    /**
+     * The total number of uncompressed bytes written into the current block,
+     * with header size not included. Valid in the READY state.
+     */
+    private int uncompressedSizeWithoutHeader;
+
+    /**
+     * Only used when we are using cache-on-write. Valid in the READY state.
+     * Contains the header and the uncompressed bytes, so the length is
+     * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}.
+     */
+    private byte[] uncompressedBytesWithHeader;
+
+    /**
+     * Current block's start offset in the {@link HFile}. Set in
+     * {@link #writeHeaderAndData(FSDataOutputStream)}.
+     */
+    private long startOffset;
+
+    /**
+     * Offset of previous block by block type. Updated when the next block is
+     * started.
+     */
+    private long[] prevOffsetByType;
+
+    /**
+     * Whether we are accumulating uncompressed bytes for the purpose of
+     * caching on write.
+     */
+    private boolean cacheOnWrite;
+
+    /** The offset of the previous block of the same type */
+    private long prevOffset;
+
+    /**
+     * @param blockType
+     *          block type to create
+     * @param compressionAlgorithm
+     *          compression algorithm to use
+     */
+    public Writer(Compression.Algorithm compressionAlgorithm) {
+      compressAlgo = compressionAlgorithm == null ? NONE
+          : compressionAlgorithm;
+
+      baosOnDisk = new ByteArrayOutputStream();
+      if (compressAlgo != NONE)
+        compressor = compressionAlgorithm.getCompressor();
+
+      prevOffsetByType = new long[BlockType.values().length];
+      for (int i = 0; i < prevOffsetByType.length; ++i)
+        prevOffsetByType[i] = -1;
+    }
+
+    /**
+     * Starts writing into the block. The previous block's data is discarded.
+     *
+     * @return the stream the user can write their data into
+     * @throws IOException
+     */
+    public DataOutputStream startWriting(BlockType newBlockType,
+        boolean cacheOnWrite) throws IOException {
+      if (state == State.BLOCK_READY && startOffset != -1) {
+        // We had a previous block that was written to a stream at a specific
+        // offset. Save that offset as the last offset of a block of that type.
+        prevOffsetByType[blockType.ordinal()] = startOffset;
+      }
+
+      this.cacheOnWrite = cacheOnWrite;
+
+      startOffset = -1;
+      blockType = newBlockType;
+
+      baosOnDisk.reset();
+      baosOnDisk.write(DUMMY_HEADER);
+
+      state = State.WRITING;
+      if (compressAlgo == NONE) {
+        // We do not need a compression stream or a second uncompressed stream
+        // for cache-on-write.
+        userDataStream = new DataOutputStream(baosOnDisk);
+      } else {
+        OutputStream compressingOutputStream =
+          compressAlgo.createCompressionStream(baosOnDisk, compressor, 0);
+
+        if (cacheOnWrite) {
+          // We save uncompressed data in a cache-on-write mode.
+          if (baosInMemory == null)
+            baosInMemory = new ByteArrayOutputStream();
+          baosInMemory.reset();
+          baosInMemory.write(DUMMY_HEADER);
+          userDataStream = new DataOutputStream(new DoubleOutputStream(
+              compressingOutputStream, baosInMemory));
+        } else {
+          userDataStream = new DataOutputStream(compressingOutputStream);
+        }
+      }
+
+      return userDataStream;
+    }
+
+    /**
+     * Returns the stream for the user to write to. The block writer takes care
+     * of handling compression and buffering for caching on write. Can only be
+     * called in the "writing" state.
+     *
+     * @return the data output stream for the user to write to
+     */
+    DataOutputStream getUserDataStream() {
+      expectState(State.WRITING);
+      return userDataStream;
+    }
+
+    /**
+     * Transitions the block writer from the "writing" state to the "block
+     * ready" state.  Does nothing if a block is already finished.
+     */
+    private void ensureBlockReady() throws IOException {
+      Preconditions.checkState(state != State.INIT,
+          "Unexpected state: " + state);
+
+      if (state == State.BLOCK_READY)
+        return;
+
+      finishBlock();
+      state = State.BLOCK_READY;
+    }
+
+    /**
+     * An internal method that flushes the compressing stream (if using
+     * compression), serializes the header, and takes care of the separate
+     * uncompressed stream for caching on write, if applicable. Block writer
+     * state transitions must be managed by the caller.
+     */
+    private void finishBlock() throws IOException {
+      userDataStream.flush();
+      uncompressedSizeWithoutHeader = userDataStream.size();
+
+      onDiskBytesWithHeader = baosOnDisk.toByteArray();
+      prevOffset = prevOffsetByType[blockType.ordinal()];
+      putHeader(onDiskBytesWithHeader, 0);
+
+      if (cacheOnWrite && compressAlgo != NONE) {
+        uncompressedBytesWithHeader = baosInMemory.toByteArray();
+
+        if (uncompressedSizeWithoutHeader !=
+            uncompressedBytesWithHeader.length - HEADER_SIZE) {
+          throw new IOException("Uncompressed size mismatch: "
+              + uncompressedSizeWithoutHeader + " vs. "
+              + (uncompressedBytesWithHeader.length - HEADER_SIZE));
+        }
+
+        // Write the header into the beginning of the uncompressed byte array.
+        putHeader(uncompressedBytesWithHeader, 0);
+      }
+    }
+
+    /** Put the header into the given byte array at the given offset. */
+    private void putHeader(byte[] dest, int offset) {
+      offset = blockType.put(dest, offset);
+      offset = Bytes.putInt(dest, offset, onDiskBytesWithHeader.length
+          - HEADER_SIZE);
+      offset = Bytes.putInt(dest, offset, uncompressedSizeWithoutHeader);
+      Bytes.putLong(dest, offset, prevOffset);
+    }
+
+    /**
+     * Similar to {@link #writeHeaderAndData(DataOutputStream)}, but records
+     * the offset of this block so that it can be referenced in the next block
+     * of the same type.
+     *
+     * @param out
+     * @throws IOException
+     */
+    public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
+      long offset = out.getPos();
+      if (startOffset != -1 && offset != startOffset) {
+        throw new IOException("A " + blockType + " block written to a "
+            + "stream twice, first at offset " + startOffset + ", then at "
+            + offset);
+      }
+      startOffset = offset;
+
+      writeHeaderAndData((DataOutputStream) out);
+    }
+
+    /**
+     * Writes the header and the compressed data of this block (or uncompressed
+     * data when not using compression) into the given stream. Can be called in
+     * the "writing" state or in the "block ready" state. If called in the
+     * "writing" state, transitions the writer to the "block ready" state.
+     *
+     * @param out the output stream to write the
+     * @throws IOException
+     */
+    private void writeHeaderAndData(DataOutputStream out) throws IOException {
+      ensureBlockReady();
+      out.write(onDiskBytesWithHeader);
+    }
+
+    /**
+     * Returns the header or the compressed data (or uncompressed data when not
+     * using compression) as a byte array. Can be called in the "writing" state
+     * or in the "block ready" state. If called in the "writing" state,
+     * transitions the writer to the "block ready" state.
+     *
+     * @return header and data as they would be stored on disk in a byte array
+     * @throws IOException
+     */
+    public byte[] getHeaderAndData() throws IOException {
+      ensureBlockReady();
+      return onDiskBytesWithHeader;
+    }
+
+    /**
+     * Releases the compressor this writer uses to compress blocks into the
+     * compressor pool. Needs to be called before the writer is discarded.
+     */
+    public void releaseCompressor() {
+      if (compressor != null) {
+        compressAlgo.returnCompressor(compressor);
+        compressor = null;
+      }
+    }
+
+    /**
+     * Returns the on-disk size of the data portion of the block. This is the
+     * compressed size if compression is enabled. Can only be called in the
+     * "block ready" state. Header is not compressed, and its size is not
+     * included in the return value.
+     *
+     * @return the on-disk size of the block, not including the header.
+     */
+    public int getOnDiskSizeWithoutHeader() {
+      expectState(State.BLOCK_READY);
+      return onDiskBytesWithHeader.length - HEADER_SIZE;
+    }
+
+    /**
+     * Returns the on-disk size of the block. Can only be called in the
+     * "block ready" state.
+     *
+     * @return the on-disk size of the block ready to be written, including the
+     *         header size
+     */
+    public int getOnDiskSizeWithHeader() {
+      expectState(State.BLOCK_READY);
+      return onDiskBytesWithHeader.length;
+    }
+
+    /**
+     * The uncompressed size of the block data. Does not include header size.
+     */
+    public int getUncompressedSizeWithoutHeader() {
+      expectState(State.BLOCK_READY);
+      return uncompressedSizeWithoutHeader;
+    }
+
+    /**
+     * The uncompressed size of the block data, including header size.
+     */
+    public int getUncompressedSizeWithHeader() {
+      expectState(State.BLOCK_READY);
+      return uncompressedSizeWithoutHeader + HEADER_SIZE;
+    }
+
+    /** @return true if a block is being written  */
+    public boolean isWriting() {
+      return state == State.WRITING;
+    }
+
+    /**
+     * Returns the number of bytes written into the current block so far, or
+     * zero if not writing the block at the moment. Note that this will return
+     * zero in the "block ready" state as well.
+     *
+     * @return the number of bytes written
+     */
+    public int blockSizeWritten() {
+      if (state != State.WRITING)
+        return 0;
+      return userDataStream.size();
+    }
+
+    /**
+     * Returns the header followed by the uncompressed data, even if using
+     * compression. This is needed for storing uncompressed blocks in the block
+     * cache. Can be called in the "writing" state or the "block ready" state.
+     *
+     * @return uncompressed block bytes for caching on write
+     */
+    private byte[] getUncompressedDataWithHeader() {
+      expectState(State.BLOCK_READY);
+
+      if (compressAlgo == NONE)
+        return onDiskBytesWithHeader;
+
+      if (!cacheOnWrite)
+        throw new IllegalStateException("Cache-on-write is turned off");
+
+      if (uncompressedBytesWithHeader == null)
+        throw new NullPointerException();
+
+      return uncompressedBytesWithHeader;
+    }
+
+    private void expectState(State expectedState) {
+      if (state != expectedState) {
+        throw new IllegalStateException("Expected state: " + expectedState +
+            ", actual state: " + state);
+      }
+    }
+
+    /**
+     * Similar to {@link #getUncompressedDataWithHeader()} but returns a byte
+     * buffer.
+     *
+     * @return uncompressed block for caching on write in the form of a buffer
+     */
+    public ByteBuffer getUncompressedBufferWithHeader() {
+      byte[] b = getUncompressedDataWithHeader();
+      return ByteBuffer.wrap(b, 0, b.length);
+    }
+
+    /**
+     * Takes the given {@link BlockWritable} instance, creates a new block of
+     * its appropriate type, writes the writable into this block, and flushes
+     * the block into the output stream. The writer is instructed not to buffer
+     * uncompressed bytes for cache-on-write.
+     *
+     * @param bw the block-writable object to write as a block
+     * @param out the file system output stream
+     * @throws IOException
+     */
+    public void writeBlock(BlockWritable bw, FSDataOutputStream out)
+        throws IOException {
+      bw.writeToBlock(startWriting(bw.getBlockType(), false));
+      writeHeaderAndData(out);
+    }
+
+    public HFileBlock getBlockForCaching() {
+      return new HFileBlock(blockType, onDiskBytesWithHeader.length
+          - HEADER_SIZE, uncompressedSizeWithoutHeader, prevOffset,
+          getUncompressedBufferWithHeader(), false, startOffset);
+    }
+
+  }
+
+  /** Something that can be written into a block. */
+  public interface BlockWritable {
+
+    /** The type of block this data should use. */
+    BlockType getBlockType();
+
+    /**
+     * Writes the block to the provided stream. Must not write any magic
+     * records.
+     *
+     * @param out a stream to write uncompressed data into
+     */
+    void writeToBlock(DataOutput out) throws IOException;
+  }
+
+  // Block readers and writers
+
+  /** An interface allowing to iterate {@link HFileBlock}s. */
+  public interface BlockIterator {
+
+    /**
+     * Get the next block, or null if there are no more blocks to iterate.
+     */
+    HFileBlock nextBlock() throws IOException;
+
+    /**
+     * Similar to {@link #nextBlock()} but checks block type, throws an
+     * exception if incorrect, and returns the data portion of the block as
+     * an input stream.
+     */
+    DataInputStream nextBlockAsStream(BlockType blockType) throws IOException;
+  }
+
+  /**
+   * Just the basic ability to read blocks, providing optional hints of
+   * on-disk-size and/or uncompressed size.
+   */
+  public interface BasicReader {
+    /**
+     * Reads the block at the given offset in the file with the given on-disk
+     * size and uncompressed size.
+     *
+     * @param offset
+     * @param onDiskSize the on-disk size of the entire block, including all
+     *          applicable headers, or -1 if unknown
+     * @param uncompressedSize the uncompressed size of the compressed part of
+     *          the block, or -1 if unknown
+     * @return the newly read block
+     */
+    HFileBlock readBlockData(long offset, long onDiskSize,
+        int uncompressedSize, boolean pread) throws IOException;
+  }
+
+  /** A full-fledged reader with an iteration ability. */
+  public interface FSReader extends BasicReader {
+
+    /**
+     * Creates a block iterator over the given portion of the {@link HFile}.
+     * The iterator returns blocks starting with offset such that offset <=
+     * startOffset < endOffset.
+     *
+     * @param startOffset the offset of the block to start iteration with
+     * @param endOffset the offset to end iteration at (exclusive)
+     * @return an iterator of blocks between the two given offsets
+     */
+    BlockIterator blockRange(long startOffset, long endOffset);
+  }
+
+  /**
+   * A common implementation of some methods of {@link FSReader} and some
+   * tools for implementing HFile format version-specific block readers.
+   */
+  public abstract static class AbstractFSReader implements FSReader {
+
+    /** The file system stream of the underlying {@link HFile} */
+    protected FSDataInputStream istream;
+
+    /** Compression algorithm used by the {@link HFile} */
+    protected Compression.Algorithm compressAlgo;
+
+    /** The size of the file we are reading from, or -1 if unknown. */
+    protected long fileSize;
+
+    /** The default buffer size for our buffered streams */
+    public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
+
+    public AbstractFSReader(FSDataInputStream istream, Algorithm compressAlgo,
+        long fileSize) {
+      this.istream = istream;
+      this.compressAlgo = compressAlgo;
+      this.fileSize = fileSize;
+    }
+
+    @Override
+    public BlockIterator blockRange(final long startOffset,
+        final long endOffset) {
+      return new BlockIterator() {
+        private long offset = startOffset;
+
+        @Override
+        public HFileBlock nextBlock() throws IOException {
+          if (offset >= endOffset)
+            return null;
+          HFileBlock b = readBlockData(offset, -1, -1, false);
+          offset += b.getOnDiskSizeWithHeader();
+          return b;
+        }
+
+        @Override
+        public DataInputStream nextBlockAsStream(BlockType blockType)
+            throws IOException {
+          HFileBlock blk = nextBlock();
+          if (blk.getBlockType() != blockType) {
+            throw new IOException("Expected block of type " + blockType
+                + " but found " + blk.getBlockType());
+          }
+          return blk.getByteStream();
+        }
+      };
+    }
+
+    /**
+     * Does a positional read or a seek and read into the given buffer. Returns
+     * the on-disk size of the next block, or -1 if it could not be determined.
+     *
+     * @param dest destination buffer
+     * @param destOffset offset in the destination buffer
+     * @param size size of the block to be read
+     * @param peekIntoNextBlock whether to read the next block's on-disk size
+     * @param fileOffset position in the stream to read at
+     * @param pread whether we should do a positional read
+     * @return the on-disk size of the next block with header size included, or
+     *         -1 if it could not be determined
+     * @throws IOException
+     */
+    protected int readAtOffset(byte[] dest, int destOffset, int size,
+        boolean peekIntoNextBlock, long fileOffset, boolean pread)
+        throws IOException {
+      if (peekIntoNextBlock &&
+          destOffset + size + HEADER_SIZE > dest.length) {
+        // We are asked to read the next block's header as well, but there is
+        // not enough room in the array.
+        throw new IOException("Attempted to read " + size + " bytes and " +
+            HEADER_SIZE + " bytes of next header into a " + dest.length +
+            "-byte array at offset " + destOffset);
+      }
+
+      if (pread) {
+        // Positional read. Better for random reads.
+        int extraSize = peekIntoNextBlock ? HEADER_SIZE : 0;
+
+        int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
+        if (ret < size) {
+          throw new IOException("Positional read of " + size + " bytes " +
+              "failed at offset " + fileOffset + " (returned " + ret + ")");
+        }
+
+        if (ret == size || ret < size + extraSize) {
+          // Could not read the next block's header, or did not try.
+          return -1;
+        }
+      } else {
+        // Seek + read. Better for scanning.
+        synchronized (istream) {
+          istream.seek(fileOffset);
+
+          long realOffset = istream.getPos();
+          if (realOffset != fileOffset) {
+            throw new IOException("Tried to seek to " + fileOffset + " to "
+                + "read " + size + " bytes, but pos=" + realOffset
+                + " after seek");
+          }
+
+          if (!peekIntoNextBlock) {
+            IOUtils.readFully(istream, dest, destOffset, size);
+            return -1;
+          }
+
+          // Try to read the next block header.
+          if (!readWithExtra(istream, dest, destOffset, size, HEADER_SIZE))
+            return -1;
+        }
+      }
+
+      assert peekIntoNextBlock;
+      return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) +
+          HEADER_SIZE;
+    }
+
+    /**
+     * Decompresses data from the given stream using the configured compression
+     * algorithm.
+     *
+     * @param boundedStream
+     *          a stream to read compressed data from, bounded to the exact
+     *          amount of compressed data
+     * @param compressedSize
+     *          compressed data size, header not included
+     * @param uncompressedSize
+     *          uncompressed data size, header not included
+     * @param header
+     *          the header to include before the decompressed data, or null.
+     *          Only the first {@link HFileBlock#HEADER_SIZE} bytes of the
+     *          buffer are included.
+     * @return the byte buffer containing the given header (optionally) and the
+     *         decompressed data
+     * @throws IOException
+     */
+    protected void decompress(byte[] dest, int destOffset,
+        InputStream bufferedBoundedStream, int compressedSize,
+        int uncompressedSize) throws IOException {
+      Decompressor decompressor = null;
+      try {
+        decompressor = compressAlgo.getDecompressor();
+        InputStream is = compressAlgo.createDecompressionStream(
+            bufferedBoundedStream, decompressor, 0);
+
+        IOUtils.readFully(is, dest, destOffset, uncompressedSize);
+        is.close();
+      } finally {
+        if (decompressor != null) {
+          compressAlgo.returnDecompressor(decompressor);
+        }
+      }
+    }
+
+    /**
+     * Creates a buffered stream reading a certain slice of the file system
+     * input stream. We need this because the decompression we use seems to
+     * expect the input stream to be bounded.
+     *
+     * @param offset the starting file offset the bounded stream reads from
+     * @param size the size of the segment of the file the stream should read
+     * @param pread whether to use position reads
+     * @return a stream restricted to the given portion of the file
+     */
+    protected InputStream createBufferedBoundedStream(long offset,
+        int size, boolean pread) {
+      return new BufferedInputStream(new BoundedRangeFileInputStream(istream,
+          offset, size, pread), Math.min(DEFAULT_BUFFER_SIZE, size));
+    }
+
+  }
+
+  /**
+   * Reads version 1 blocks from the file system. In version 1 blocks,
+   * everything is compressed, including the magic record, if compression is
+   * enabled. Everything might be uncompressed if no compression is used. This
+   * reader returns blocks represented in the uniform version 2 format in
+   * memory.
+   */
+  public static class FSReaderV1 extends AbstractFSReader {
+
+    /** Header size difference between version 1 and 2 */
+    private static final int HEADER_DELTA = HEADER_SIZE - MAGIC_LENGTH;
+
+    public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
+        long fileSize) {
+      super(istream, compressAlgo, fileSize);
+    }
+
+    /**
+     * Read a version 1 block. There is no uncompressed header, and the block
+     * type (the magic record) is part of the compressed data. This
+     * implementation assumes that the bounded range file input stream is
+     * needed to stop the decompressor reading into next block, because the
+     * decompressor just grabs a bunch of data without regard to whether it is
+     * coming to end of the compressed section.
+     *
+     * The block returned is still a version 2 block, and in particular, its
+     * first {@link #HEADER_SIZE} bytes contain a valid version 2 header.
+     *
+     * @param offset the offset of the block to read in the file
+     * @param onDiskSizeWithMagic the on-disk size of the version 1 block,
+     *          including the magic record, which is the part of compressed
+     *          data if using compression
+     * @param uncompressedSizeWithMagic uncompressed size of the version 1
+     *          block, including the magic record
+     */
+    @Override
+    public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
+        int uncompressedSizeWithMagic, boolean pread) throws IOException {
+      if (uncompressedSizeWithMagic <= 0) {
+        throw new IOException("Invalid uncompressedSize="
+            + uncompressedSizeWithMagic + " for a version 1 block");
+      }
+
+      if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE)
+      {
+        throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic
+            + " (maximum allowed: " + Integer.MAX_VALUE + ")");
+      }
+
+      int onDiskSize = (int) onDiskSizeWithMagic;
+
+      if (uncompressedSizeWithMagic < MAGIC_LENGTH) {
+        throw new IOException("Uncompressed size for a version 1 block is "
+            + uncompressedSizeWithMagic + " but must be at least "
+            + MAGIC_LENGTH);
+      }
+
+      // The existing size already includes magic size, and we are inserting
+      // a version 2 header.
+      ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic
+          + HEADER_DELTA);
+
+      int onDiskSizeWithoutHeader;
+      if (compressAlgo == Compression.Algorithm.NONE) {
+        // A special case when there is no compression.
+        if (onDiskSize != uncompressedSizeWithMagic) {
+          throw new IOException("onDiskSize=" + onDiskSize
+              + " and uncompressedSize=" + uncompressedSizeWithMagic
+              + " must be equal for version 1 with no compression");
+        }
+
+        // The first MAGIC_LENGTH bytes of what this will read will be
+        // overwritten.
+        readAtOffset(buf.array(), buf.arrayOffset() + HEADER_DELTA,
+            onDiskSize, false, offset, pread);
+
+        onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
+      } else {
+        InputStream bufferedBoundedStream = createBufferedBoundedStream(
+            offset, onDiskSize, pread);
+        decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA,
+            bufferedBoundedStream, onDiskSize, uncompressedSizeWithMagic);
+
+        // We don't really have a good way to exclude the "magic record" size
+        // from the compressed block's size, since it is compressed as well.
+        onDiskSizeWithoutHeader = onDiskSize;
+      }
+
+      BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset()
+          + HEADER_DELTA, MAGIC_LENGTH);
+
+      // We set the uncompressed size of the new HFile block we are creating
+      // to the size of the data portion of the block without the magic record,
+      // since the magic record gets moved to the header.
+      HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
+          uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, true, offset);
+      return b;
+    }
+  }
+
+  /**
+   * We always prefetch the header of the next block, so that we know its
+   * on-disk size in advance and can read it in one operation.
+   */
+  private static class PrefetchedHeader {
+    long offset = -1;
+    byte[] header = new byte[HEADER_SIZE];
+    ByteBuffer buf = ByteBuffer.wrap(header, 0, HEADER_SIZE);
+  }
+
+  /** Reads version 2 blocks from the filesystem. */
+  public static class FSReaderV2 extends AbstractFSReader {
+
+    private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
+        new ThreadLocal<PrefetchedHeader>() {
+          @Override
+          public PrefetchedHeader initialValue() {
+            return new PrefetchedHeader();
+          }
+        };
+
+    public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
+        long fileSize) {
+      super(istream, compressAlgo, fileSize);
+    }
+
+    /**
+     * Reads a version 2 block. Tries to do as little memory allocation as
+     * possible, using the provided on-disk size.
+     *
+     * @param offset the offset in the stream to read at
+     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
+     *          the header, or -1 if unknown
+     * @param uncompressedSize the uncompressed size of the the block. Always
+     *          expected to be -1. This parameter is only used in version 1.
+     * @param pread whether to use a positional read
+     */
+    @Override
+    public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL,
+        int uncompressedSize, boolean pread) throws IOException {
+      if (offset < 0) {
+        throw new IOException("Invalid offset=" + offset + " trying to read "
+            + "block (onDiskSize=" + onDiskSizeWithHeaderL
+            + ", uncompressedSize=" + uncompressedSize + ")");
+      }
+      if (uncompressedSize != -1) {
+        throw new IOException("Version 2 block reader API does not need " +
+            "the uncompressed size parameter");
+      }
+
+      if ((onDiskSizeWithHeaderL < HEADER_SIZE && onDiskSizeWithHeaderL != -1)
+          || onDiskSizeWithHeaderL >= Integer.MAX_VALUE) {
+        throw new IOException("Invalid onDisksize=" + onDiskSizeWithHeaderL
+            + ": expected to be at least " + HEADER_SIZE
+            + " and at most " + Integer.MAX_VALUE + ", or -1 (offset="
+            + offset + ", uncompressedSize=" + uncompressedSize + ")");
+      }
+
+      int onDiskSizeWithHeader = (int) onDiskSizeWithHeaderL;
+
+      HFileBlock b;
+      if (onDiskSizeWithHeader > 0) {
+        // We know the total on-disk size but not the uncompressed size. Read
+        // the entire block into memory, then parse the header and decompress
+        // from memory if using compression. This code path is used when
+        // doing a random read operation relying on the block index, as well as
+        // when the client knows the on-disk size from peeking into the next
+        // block's header (e.g. this block's header) when reading the previous
+        // block. This is the faster and more preferable case.
+
+        int onDiskSizeWithoutHeader = onDiskSizeWithHeader - HEADER_SIZE;
+        assert onDiskSizeWithoutHeader >= 0;
+
+        // See if we can avoid reading the header. This is desirable, because
+        // we will not incur a seek operation to seek back if we have already
+        // read this block's header as part of the previous read's look-ahead.
+        PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
+        byte[] header = prefetchedHeader.offset == offset
+            ? prefetchedHeader.header : null;
+
+        // Size that we have to skip in case we have already read the header.
+        int preReadHeaderSize = header == null ? 0 : HEADER_SIZE;
+
+        if (compressAlgo == Compression.Algorithm.NONE) {
+          // Just read the whole thing. Allocate enough space to read the
+          // next block's header too.
+
+          ByteBuffer headerAndData = ByteBuffer.allocate(onDiskSizeWithHeader
+              + HEADER_SIZE);
+          headerAndData.limit(onDiskSizeWithHeader);
+
+          if (header != null) {
+            System.arraycopy(header, 0, headerAndData.array(), 0,
+                HEADER_SIZE);
+          }
+
+          int nextBlockOnDiskSizeWithHeader = readAtOffset(
+              headerAndData.array(), headerAndData.arrayOffset()
+                  + preReadHeaderSize, onDiskSizeWithHeader
+                  - preReadHeaderSize, true, offset + preReadHeaderSize,
+                  pread);
+
+          b = new HFileBlock(headerAndData);
+          b.assumeUncompressed();
+          b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
+          b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSizeWithHeader;
+
+          if (b.nextBlockOnDiskSizeWithHeader > 0)
+            setNextBlockHeader(offset, b);
+        } else {
+          // Allocate enough space to fit the next block's header too.
+          byte[] onDiskBlock = new byte[onDiskSizeWithHeader + HEADER_SIZE];
+
+          int nextBlockOnDiskSize = readAtOffset(onDiskBlock,
+              preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
+              true, offset + preReadHeaderSize, pread);
+
+          if (header == null)
+            header = onDiskBlock;
+
+          try {
+            b = new HFileBlock(ByteBuffer.wrap(header, 0, HEADER_SIZE));
+          } catch (IOException ex) {
+            // Seen in load testing. Provide comprehensive debug info.
+            throw new IOException("Failed to read compressed block at "
+                + offset + ", onDiskSizeWithoutHeader=" + onDiskSizeWithHeader
+                + ", preReadHeaderSize=" + preReadHeaderSize
+                + ", header.length=" + header.length + ", header bytes: "
+                + Bytes.toStringBinary(header, 0, HEADER_SIZE), ex);
+          }
+          b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
+          b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
+
+          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
+              onDiskBlock, HEADER_SIZE, onDiskSizeWithoutHeader));
+
+          // This will allocate a new buffer but keep header bytes.
+          b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0);
+
+          decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
+              onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+
+          // Copy next block's header bytes into the new block if we have them.
+          if (nextBlockOnDiskSize > 0) {
+            System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
+                b.buf.arrayOffset() + HEADER_SIZE
+                    + b.uncompressedSizeWithoutHeader, HEADER_SIZE);
+
+            setNextBlockHeader(offset, b);
+          }
+        }
+
+      } else {
+        // We don't know the on-disk size. Read the header first, determine the
+        // on-disk size from it, and read the remaining data, thereby incurring
+        // two read operations. This might happen when we are doing the first
+        // read in a series of reads or a random read, and we don't have access
+        // to the block index. This is costly and should happen very rarely.
+
+        // Check if we have read this block's header as part of reading the
+        // previous block. If so, don't read the header again.
+        PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
+        ByteBuffer headerBuf = prefetchedHeader.offset == offset ?
+            prefetchedHeader.buf : null;
+
+        if (headerBuf == null) {
+          // Unfortunately, we still have to do a separate read operation to
+          // read the header.
+          headerBuf = ByteBuffer.allocate(HEADER_SIZE);;
+          readAtOffset(headerBuf.array(), headerBuf.arrayOffset(), HEADER_SIZE,
+              false, offset, pread);
+        }
+
+        b = new HFileBlock(headerBuf);
+
+        // This will also allocate enough room for the next block's header.
+        b.allocateBuffer(true);
+
+        if (compressAlgo == Compression.Algorithm.NONE) {
+
+          // Avoid creating bounded streams and using a "codec" that does
+          // nothing.
+          b.assumeUncompressed();
+          b.nextBlockOnDiskSizeWithHeader = readAtOffset(b.buf.array(),
+              b.buf.arrayOffset() + HEADER_SIZE,
+              b.uncompressedSizeWithoutHeader, true, offset + HEADER_SIZE,
+              pread);
+
+          if (b.nextBlockOnDiskSizeWithHeader > 0) {
+            setNextBlockHeader(offset, b);
+          }
+        } else {
+          // Allocate enough space for the block's header and compressed data.
+          byte[] compressedBytes = new byte[b.getOnDiskSizeWithHeader()
+              + HEADER_SIZE];
+
+          b.nextBlockOnDiskSizeWithHeader = readAtOffset(compressedBytes,
+              HEADER_SIZE, b.onDiskSizeWithoutHeader, true, offset
+                  + HEADER_SIZE, pread);
+          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
+              compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader));
+
+          decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
+              b.onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+
+          if (b.nextBlockOnDiskSizeWithHeader > 0) {
+            // Copy the next block's header into the new block.
+            int nextHeaderOffset = b.buf.arrayOffset() + HEADER_SIZE
+                + b.uncompressedSizeWithoutHeader;
+            System.arraycopy(compressedBytes,
+                compressedBytes.length - HEADER_SIZE,
+                b.buf.array(),
+                nextHeaderOffset,
+                HEADER_SIZE);
+
+            setNextBlockHeader(offset, b);
+          }
+        }
+      }
+
+      b.offset = offset;
+      return b;
+    }
+
+    private void setNextBlockHeader(long offset, HFileBlock b) {
+      PrefetchedHeader prefetchedHeader = prefetchedHeaderForThread.get();
+      prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
+      int nextHeaderOffset = b.buf.arrayOffset() + HEADER_SIZE
+          + b.uncompressedSizeWithoutHeader;
+      System.arraycopy(b.buf.array(), nextHeaderOffset,
+          prefetchedHeader.header, 0, HEADER_SIZE);
+    }
+
+  }
+
+}