You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/04/04 03:58:14 UTC

[1/4] hbase git commit: Revert "HBASE-13373 Squash HFileReaderV3 together with HFileReaderV2 and AbstractHFileReader; ditto for Scanners and BlockReader, etc.:" Revert because breaking migration tests.

Repository: hbase
Updated Branches:
  refs/heads/branch-1 5c19f9eac -> e65f43000


http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
deleted file mode 100644
index 0555363..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ /dev/null
@@ -1,641 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.io.hfile;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.BloomFilterWriter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Common functionality needed by all versions of {@link HFile} writers.
- */
-@InterfaceAudience.Private
-public class HFileWriterImpl implements HFile.Writer {
-  private static final Log LOG = LogFactory.getLog(HFileWriterImpl.class);
-
-  /** The Cell previously appended. Becomes the last cell in the file.*/
-  protected Cell lastCell = null;
-
-  /** FileSystem stream to write into. */
-  protected FSDataOutputStream outputStream;
-
-  /** True if we opened the <code>outputStream</code> (and so will close it). */
-  protected final boolean closeOutputStream;
-
-  /** A "file info" block: a key-value map of file-wide metadata. */
-  protected FileInfo fileInfo = new HFile.FileInfo();
-
-  /** Total # of key/value entries, i.e. how many times add() was called. */
-  protected long entryCount = 0;
-
-  /** Used for calculating the average key length. */
-  protected long totalKeyLength = 0;
-
-  /** Used for calculating the average value length. */
-  protected long totalValueLength = 0;
-
-  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
-  protected long totalUncompressedBytes = 0;
-
-  /** Key comparator. Used to ensure we write in order. */
-  protected final KVComparator comparator;
-
-  /** Meta block names. */
-  protected List<byte[]> metaNames = new ArrayList<byte[]>();
-
-  /** {@link Writable}s representing meta block data. */
-  protected List<Writable> metaData = new ArrayList<Writable>();
-
-  /**
-   * First cell in a block.
-   * This reference should be short-lived since we write hfiles in a burst.
-   */
-  protected Cell firstCellInBlock = null;
-
-
-  /** May be null if we were passed a stream. */
-  protected final Path path;
-
-  /** Cache configuration for caching data on write. */
-  protected final CacheConfig cacheConf;
-
-  /**
-   * Name for this object used when logging or in toString. Is either
-   * the result of a toString on stream or else name of passed file Path.
-   */
-  protected final String name;
-
-  /**
-   * The data block encoding which will be used.
-   * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
-   */
-  protected final HFileDataBlockEncoder blockEncoder;
-
-  protected final HFileContext hFileContext;
-
-  private int maxTagsLength = 0;
-
-  /** KeyValue version in FileInfo */
-  public static final byte [] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
-
-  /** Version for KeyValue which includes memstore timestamp */
-  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
-
-  /** Inline block writers for multi-level block index and compound Blooms. */
-  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<InlineBlockWriter>();
-
-  /** block writer */
-  protected HFileBlock.Writer fsBlockWriter;
-
-  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
-  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
-
-  /** The offset of the first data block or -1 if the file is empty. */
-  private long firstDataBlockOffset = -1;
-
-  /** The offset of the last data block or 0 if the file is empty. */
-  protected long lastDataBlockOffset;
-
-  /**
-   * The last(stop) Cell of the previous data block.
-   * This reference should be short-lived since we write hfiles in a burst.
-   */
-  private Cell lastCellOfPreviousBlock = null;
-
-  /** Additional data items to be written to the "load-on-open" section. */
-  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<BlockWritable>();
-
-  protected long maxMemstoreTS = 0;
-
-  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
-      FSDataOutputStream outputStream,
-      KVComparator comparator, HFileContext fileContext) {
-    this.outputStream = outputStream;
-    this.path = path;
-    this.name = path != null ? path.getName() : outputStream.toString();
-    this.hFileContext = fileContext;
-    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
-    if (encoding != DataBlockEncoding.NONE) {
-      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
-    } else {
-      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
-    }
-    this.comparator = comparator != null ? comparator
-        : KeyValue.COMPARATOR;
-
-    closeOutputStream = path != null;
-    this.cacheConf = cacheConf;
-    finishInit(conf);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Writer" + (path != null ? " for " + path : "") +
-        " initialized with cacheConf: " + cacheConf +
-        " comparator: " + comparator.getClass().getSimpleName() +
-        " fileContext: " + fileContext);
-    }
-  }
-
-  /**
-   * Add to the file info. All added key/value pairs can be obtained using
-   * {@link HFile.Reader#loadFileInfo()}.
-   *
-   * @param k Key
-   * @param v Value
-   * @throws IOException in case the key or the value are invalid
-   */
-  @Override
-  public void appendFileInfo(final byte[] k, final byte[] v)
-      throws IOException {
-    fileInfo.append(k, v, true);
-  }
-
-  /**
-   * Sets the file info offset in the trailer, finishes up populating fields in
-   * the file info, and writes the file info into the given data output. The
-   * reason the data output is not always {@link #outputStream} is that we store
-   * file info as a block in version 2.
-   *
-   * @param trailer fixed file trailer
-   * @param out the data output to write the file info to
-   * @throws IOException
-   */
-  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
-  throws IOException {
-    trailer.setFileInfoOffset(outputStream.getPos());
-    finishFileInfo();
-    fileInfo.write(out);
-  }
-
-  /**
-   * Checks that the given Cell's key does not violate the key order.
-   *
-   * @param cell Cell whose key to check.
-   * @return true if the key is duplicate
-   * @throws IOException if the key or the key order is wrong
-   */
-  protected boolean checkKey(final Cell cell) throws IOException {
-    boolean isDuplicateKey = false;
-
-    if (cell == null) {
-      throw new IOException("Key cannot be null or empty");
-    }
-    if (lastCell != null) {
-      int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
-
-      if (keyComp > 0) {
-        throw new IOException("Added a key not lexically larger than"
-            + " previous. Current cell = " + cell + ", lastCell = " + lastCell);
-      } else if (keyComp == 0) {
-        isDuplicateKey = true;
-      }
-    }
-    return isDuplicateKey;
-  }
-
-  /** Checks the given value for validity. */
-  protected void checkValue(final byte[] value, final int offset,
-      final int length) throws IOException {
-    if (value == null) {
-      throw new IOException("Value cannot be null");
-    }
-  }
-
-  /**
-   * @return Path or null if we were passed a stream rather than a Path.
-   */
-  @Override
-  public Path getPath() {
-    return path;
-  }
-
-  @Override
-  public String toString() {
-    return "writer=" + (path != null ? path.toString() : null) + ", name="
-        + name + ", compression=" + hFileContext.getCompression().getName();
-  }
-
-  public static Compression.Algorithm compressionByName(String algoName) {
-    if (algoName == null)
-      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
-    return Compression.getCompressionAlgorithmByName(algoName);
-  }
-
-  /** A helper method to create HFile output streams in constructors */
-  protected static FSDataOutputStream createOutputStream(Configuration conf,
-      FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
-    FsPermission perms = FSUtils.getFilePermissions(fs, conf,
-        HConstants.DATA_FILE_UMASK_KEY);
-    return FSUtils.create(fs, path, perms, favoredNodes);
-  }
-
-  /** Additional initialization steps */
-  protected void finishInit(final Configuration conf) {
-    if (fsBlockWriter != null) {
-      throw new IllegalStateException("finishInit called twice");
-    }
-
-    fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
-
-    // Data block index writer
-    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
-    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
-        cacheIndexesOnWrite ? cacheConf : null,
-        cacheIndexesOnWrite ? name : null);
-    dataBlockIndexWriter.setMaxChunkSize(
-        HFileBlockIndex.getMaxChunkSize(conf));
-    inlineBlockWriters.add(dataBlockIndexWriter);
-
-    // Meta data block index writer
-    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
-    if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
-  }
-
-  /**
-   * At a block boundary, write all the inline blocks and opens new block.
-   *
-   * @throws IOException
-   */
-  protected void checkBlockBoundary() throws IOException {
-    if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize()) return;
-    finishBlock();
-    writeInlineBlocks(false);
-    newBlock();
-  }
-
-  /** Clean up the current data block */
-  private void finishBlock() throws IOException {
-    if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return;
-
-    // Update the first data block offset for scanning.
-    if (firstDataBlockOffset == -1) {
-      firstDataBlockOffset = outputStream.getPos();
-    }
-    // Update the last data block offset
-    lastDataBlockOffset = outputStream.getPos();
-    fsBlockWriter.writeHeaderAndData(outputStream);
-    int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
-    Cell indexEntry =
-      CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
-    dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
-      lastDataBlockOffset, onDiskSize);
-    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
-    if (cacheConf.shouldCacheDataOnWrite()) {
-      doCacheOnWrite(lastDataBlockOffset);
-    }
-  }
-
-  /** Gives inline block writers an opportunity to contribute blocks. */
-  private void writeInlineBlocks(boolean closing) throws IOException {
-    for (InlineBlockWriter ibw : inlineBlockWriters) {
-      while (ibw.shouldWriteBlock(closing)) {
-        long offset = outputStream.getPos();
-        boolean cacheThisBlock = ibw.getCacheOnWrite();
-        ibw.writeInlineBlock(fsBlockWriter.startWriting(
-            ibw.getInlineBlockType()));
-        fsBlockWriter.writeHeaderAndData(outputStream);
-        ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
-            fsBlockWriter.getUncompressedSizeWithoutHeader());
-        totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
-
-        if (cacheThisBlock) {
-          doCacheOnWrite(offset);
-        }
-      }
-    }
-  }
-
-  /**
-   * Caches the last written HFile block.
-   * @param offset the offset of the block we want to cache. Used to determine
-   *          the cache key.
-   */
-  private void doCacheOnWrite(long offset) {
-    HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
-    cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset), cacheFormatBlock);
-  }
-
-  /**
-   * Ready a new block for writing.
-   *
-   * @throws IOException
-   */
-  protected void newBlock() throws IOException {
-    // This is where the next block begins.
-    fsBlockWriter.startWriting(BlockType.DATA);
-    firstCellInBlock = null;
-    if (lastCell != null) {
-      lastCellOfPreviousBlock = lastCell;
-    }
-  }
-
-  /**
-   * Add a meta block to the end of the file. Call before close(). Metadata
-   * blocks are expensive. Fill one with a bunch of serialized data rather than
-   * do a metadata block per metadata instance. If metadata is small, consider
-   * adding to file info using {@link #appendFileInfo(byte[], byte[])}
-   *
-   * @param metaBlockName
-   *          name of the block
-   * @param content
-   *          will call readFields to get data later (DO NOT REUSE)
-   */
-  @Override
-  public void appendMetaBlock(String metaBlockName, Writable content) {
-    byte[] key = Bytes.toBytes(metaBlockName);
-    int i;
-    for (i = 0; i < metaNames.size(); ++i) {
-      // stop when the current key is greater than our own
-      byte[] cur = metaNames.get(i);
-      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
-          key.length) > 0) {
-        break;
-      }
-    }
-    metaNames.add(i, key);
-    metaData.add(i, content);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (outputStream == null) {
-      return;
-    }
-    // Save data block encoder metadata in the file info.
-    blockEncoder.saveMetadata(this);
-    // Write out the end of the data blocks, then write meta data blocks.
-    // followed by fileinfo, data block index and meta block index.
-
-    finishBlock();
-    writeInlineBlocks(true);
-
-    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
-
-    // Write out the metadata blocks if any.
-    if (!metaNames.isEmpty()) {
-      for (int i = 0; i < metaNames.size(); ++i) {
-        // store the beginning offset
-        long offset = outputStream.getPos();
-        // write the metadata content
-        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
-        metaData.get(i).write(dos);
-
-        fsBlockWriter.writeHeaderAndData(outputStream);
-        totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
-
-        // Add the new meta block to the meta index.
-        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
-            fsBlockWriter.getOnDiskSizeWithHeader());
-      }
-    }
-
-    // Load-on-open section.
-
-    // Data block index.
-    //
-    // In version 2, this section of the file starts with the root level data
-    // block index. We call a function that writes intermediate-level blocks
-    // first, then root level, and returns the offset of the root level block
-    // index.
-
-    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
-    trailer.setLoadOnOpenOffset(rootIndexOffset);
-
-    // Meta block index.
-    metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
-        BlockType.ROOT_INDEX), "meta");
-    fsBlockWriter.writeHeaderAndData(outputStream);
-    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
-
-    if (this.hFileContext.isIncludesMvcc()) {
-      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
-      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
-    }
-
-    // File info
-    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
-    fsBlockWriter.writeHeaderAndData(outputStream);
-    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
-
-    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
-    for (BlockWritable w : additionalLoadOnOpenData){
-      fsBlockWriter.writeBlock(w, outputStream);
-      totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
-    }
-
-    // Now finish off the trailer.
-    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
-    trailer.setUncompressedDataIndexSize(
-        dataBlockIndexWriter.getTotalUncompressedSize());
-    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
-    trailer.setLastDataBlockOffset(lastDataBlockOffset);
-    trailer.setComparatorClass(comparator.getClass());
-    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
-
-
-    finishClose(trailer);
-
-    fsBlockWriter.release();
-  }
-
-  @Override
-  public void addInlineBlockWriter(InlineBlockWriter ibw) {
-    inlineBlockWriters.add(ibw);
-  }
-
-  @Override
-  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
-    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
-  }
-
-  @Override
-  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
-    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
-  }
-
-  private void addBloomFilter(final BloomFilterWriter bfw,
-      final BlockType blockType) {
-    if (bfw.getKeyCount() <= 0)
-      return;
-
-    if (blockType != BlockType.GENERAL_BLOOM_META &&
-        blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
-      throw new RuntimeException("Block Type: " + blockType.toString() +
-          "is not supported");
-    }
-    additionalLoadOnOpenData.add(new BlockWritable() {
-      @Override
-      public BlockType getBlockType() {
-        return blockType;
-      }
-
-      @Override
-      public void writeToBlock(DataOutput out) throws IOException {
-        bfw.getMetaWriter().write(out);
-        Writable dataWriter = bfw.getDataWriter();
-        if (dataWriter != null)
-          dataWriter.write(out);
-      }
-    });
-  }
-
-  @Override
-  public HFileContext getFileContext() {
-    return hFileContext;
-  }
-
-  /**
-   * Add key/value to file. Keys must be added in an order that agrees with the
-   * Comparator passed on construction.
-   *
-   * @param cell
-   *          Cell to add. Cannot be empty nor null.
-   * @throws IOException
-   */
-  @Override
-  public void append(final Cell cell) throws IOException {
-    byte[] value = cell.getValueArray();
-    int voffset = cell.getValueOffset();
-    int vlength = cell.getValueLength();
-    // checkKey uses comparator to check we are writing in order.
-    boolean dupKey = checkKey(cell);
-    checkValue(value, voffset, vlength);
-    if (!dupKey) {
-      checkBlockBoundary();
-    }
-
-    if (!fsBlockWriter.isWriting()) {
-      newBlock();
-    }
-
-    fsBlockWriter.write(cell);
-
-    totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
-    totalValueLength += vlength;
-
-    // Are we the first key in this block?
-    if (firstCellInBlock == null) {
-      // If cell is big, block will be closed and this firstCellInBlock reference will only last
-      // a short while.
-      firstCellInBlock = cell;
-    }
-
-    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinetly?
-    lastCell = cell;
-    entryCount++;
-    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
-    int tagsLength = cell.getTagsLength();
-    if (tagsLength > this.maxTagsLength) {
-      this.maxTagsLength = tagsLength;
-    }
-  }
-
-  protected void finishFileInfo() throws IOException {
-    if (lastCell != null) {
-      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
-      // byte buffer. Won't take a tuple.
-      byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
-      fileInfo.append(FileInfo.LASTKEY, lastKey, false);
-    }
-
-    // Average key length.
-    int avgKeyLen =
-        entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
-    fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
-    fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
-      false);
-
-    // Average value length.
-    int avgValueLen =
-        entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
-    fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
-    if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
-      // In case of Prefix Tree encoding, we always write tags information into HFiles even if all
-      // KVs are having no tags.
-      fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
-    } else if (hFileContext.isIncludesTags()) {
-      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
-      // from the FileInfo
-      fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
-      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
-        && hFileContext.isCompressTags();
-      fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
-    }
-  }
-
-  protected int getMajorVersion() {
-    return 3;
-  }
-
-  protected int getMinorVersion() {
-    return HFileReaderImpl.MAX_MINOR_VERSION;
-  }
-
-  protected void finishClose(FixedFileTrailer trailer) throws IOException {
-    // Write out encryption metadata before finalizing if we have a valid crypto context
-    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
-    if (cryptoContext != Encryption.Context.NONE) {
-      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
-      // all information needed for decryption
-      trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
-        cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
-          User.getCurrent().getShortName()),
-        cryptoContext.getKey()));
-    }
-    // Now we can finish the close
-    trailer.setMetaIndexCount(metaNames.size());
-    trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
-    trailer.setEntryCount(entryCount);
-    trailer.setCompressionCodec(hFileContext.getCompression());
-
-    trailer.serialize(outputStream);
-
-    if (closeOutputStream) {
-      outputStream.close();
-      outputStream = null;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
new file mode 100644
index 0000000..28c4655
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
@@ -0,0 +1,424 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Writes HFile format version 2.
+ */
+@InterfaceAudience.Private
+public class HFileWriterV2 extends AbstractHFileWriter {
+  static final Log LOG = LogFactory.getLog(HFileWriterV2.class);
+
+  /** Max memstore (mvcc) timestamp in FileInfo */
+  public static final byte [] MAX_MEMSTORE_TS_KEY =
+      Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
+
+  /** KeyValue version in FileInfo */
+  public static final byte [] KEY_VALUE_VERSION =
+      Bytes.toBytes("KEY_VALUE_VERSION");
+
+  /** Version for KeyValue which includes memstore timestamp */
+  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
+
+  /** Inline block writers for multi-level block index and compound Blooms. */
+  private List<InlineBlockWriter> inlineBlockWriters =
+      new ArrayList<InlineBlockWriter>();
+
+  /** Unified version 2 block writer */
+  protected HFileBlock.Writer fsBlockWriter;
+
+  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
+  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
+
+  /** The offset of the first data block or -1 if the file is empty. */
+  private long firstDataBlockOffset = -1;
+
+  /** The offset of the last data block or 0 if the file is empty. */
+  protected long lastDataBlockOffset;
+
+  /**
+   * The last(stop) Cell of the previous data block.
+   * This reference should be short-lived since we write hfiles in a burst.
+   */
+  private Cell lastCellOfPreviousBlock = null;
+
+  /** Additional data items to be written to the "load-on-open" section. */
+  private List<BlockWritable> additionalLoadOnOpenData =
+    new ArrayList<BlockWritable>();
+
+  protected long maxMemstoreTS = 0;
+
+  static class WriterFactoryV2 extends HFile.WriterFactory {
+    WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
+      super(conf, cacheConf);
+    }
+
+    @Override
+    public Writer createWriter(FileSystem fs, Path path, 
+        FSDataOutputStream ostream,
+        KVComparator comparator, HFileContext context) throws IOException {
+      context.setIncludesTags(false);// HFile V2 does not deal with tags at all!
+      return new HFileWriterV2(conf, cacheConf, fs, path, ostream, 
+          comparator, context);
+      }
+    }
+
+  /** Constructor that takes a path, creates and closes the output stream. */
+  public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
+      FileSystem fs, Path path, FSDataOutputStream ostream, 
+      final KVComparator comparator, final HFileContext context) throws IOException {
+    super(cacheConf,
+        ostream == null ? createOutputStream(conf, fs, path, null) : ostream,
+        path, comparator, context);
+    finishInit(conf);
+  }
+
+  /** Additional initialization steps */
+  protected void finishInit(final Configuration conf) {
+    if (fsBlockWriter != null)
+      throw new IllegalStateException("finishInit called twice");
+
+    fsBlockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
+
+    // Data block index writer
+    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
+    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
+        cacheIndexesOnWrite ? cacheConf : null,
+        cacheIndexesOnWrite ? name : null);
+    dataBlockIndexWriter.setMaxChunkSize(
+        HFileBlockIndex.getMaxChunkSize(conf));
+    inlineBlockWriters.add(dataBlockIndexWriter);
+
+    // Meta data block index writer
+    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
+    if (LOG.isTraceEnabled()) LOG.trace("Initialized with " + cacheConf);
+  }
+
+  /**
+   * At a block boundary, write all the inline blocks and opens new block.
+   *
+   * @throws IOException
+   */
+  protected void checkBlockBoundary() throws IOException {
+    if (fsBlockWriter.blockSizeWritten() < hFileContext.getBlocksize())
+      return;
+
+    finishBlock();
+    writeInlineBlocks(false);
+    newBlock();
+  }
+
+  /** Clean up the current data block */
+  private void finishBlock() throws IOException {
+    if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
+      return;
+
+    // Update the first data block offset for scanning.
+    if (firstDataBlockOffset == -1) {
+      firstDataBlockOffset = outputStream.getPos();
+    }
+    // Update the last data block offset
+    lastDataBlockOffset = outputStream.getPos();
+    fsBlockWriter.writeHeaderAndData(outputStream);
+    int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();
+
+    Cell indexEntry =
+      CellComparator.getMidpoint(this.comparator, lastCellOfPreviousBlock, firstCellInBlock);
+    dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
+      lastDataBlockOffset, onDiskSize);
+    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
+    if (cacheConf.shouldCacheDataOnWrite()) {
+      doCacheOnWrite(lastDataBlockOffset);
+    }
+  }
+
+  /** Gives inline block writers an opportunity to contribute blocks. */
+  private void writeInlineBlocks(boolean closing) throws IOException {
+    for (InlineBlockWriter ibw : inlineBlockWriters) {
+      while (ibw.shouldWriteBlock(closing)) {
+        long offset = outputStream.getPos();
+        boolean cacheThisBlock = ibw.getCacheOnWrite();
+        ibw.writeInlineBlock(fsBlockWriter.startWriting(
+            ibw.getInlineBlockType()));
+        fsBlockWriter.writeHeaderAndData(outputStream);
+        ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),
+            fsBlockWriter.getUncompressedSizeWithoutHeader());
+        totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
+
+        if (cacheThisBlock) {
+          doCacheOnWrite(offset);
+        }
+      }
+    }
+  }
+
+  /**
+   * Caches the last written HFile block.
+   * @param offset the offset of the block we want to cache. Used to determine
+   *          the cache key.
+   */
+  private void doCacheOnWrite(long offset) {
+    HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
+    cacheConf.getBlockCache().cacheBlock(
+        new BlockCacheKey(name, offset), cacheFormatBlock);
+  }
+
+  /**
+   * Ready a new block for writing.
+   *
+   * @throws IOException
+   */
+  protected void newBlock() throws IOException {
+    // This is where the next block begins.
+    fsBlockWriter.startWriting(BlockType.DATA);
+    firstCellInBlock = null;
+    if (lastCell != null) {
+      lastCellOfPreviousBlock = lastCell;
+    }
+  }
+
+  /**
+   * Add a meta block to the end of the file. Call before close(). Metadata
+   * blocks are expensive. Fill one with a bunch of serialized data rather than
+   * do a metadata block per metadata instance. If metadata is small, consider
+   * adding to file info using {@link #appendFileInfo(byte[], byte[])}
+   *
+   * @param metaBlockName
+   *          name of the block
+   * @param content
+   *          will call readFields to get data later (DO NOT REUSE)
+   */
+  @Override
+  public void appendMetaBlock(String metaBlockName, Writable content) {
+    byte[] key = Bytes.toBytes(metaBlockName);
+    int i;
+    for (i = 0; i < metaNames.size(); ++i) {
+      // stop when the current key is greater than our own
+      byte[] cur = metaNames.get(i);
+      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0,
+          key.length) > 0) {
+        break;
+      }
+    }
+    metaNames.add(i, key);
+    metaData.add(i, content);
+  }
+
+  /**
+   * Add key/value to file. Keys must be added in an order that agrees with the
+   * Comparator passed on construction.
+   *
+   * @param cell Cell to add. Cannot be empty nor null.
+   * @throws IOException
+   */
+  @Override
+  public void append(final Cell cell) throws IOException {
+    byte[] value = cell.getValueArray();
+    int voffset = cell.getValueOffset();
+    int vlength = cell.getValueLength();
+    // checkKey uses comparator to check we are writing in order.
+    boolean dupKey = checkKey(cell);
+    checkValue(value, voffset, vlength);
+    if (!dupKey) {
+      checkBlockBoundary();
+    }
+
+    if (!fsBlockWriter.isWriting()) {
+      newBlock();
+    }
+
+    fsBlockWriter.write(cell);
+
+    totalKeyLength += CellUtil.estimatedSerializedSizeOfKey(cell);
+    totalValueLength += vlength;
+
+    // Are we the first key in this block?
+    if (firstCellInBlock == null) {
+      // If cell is big, block will be closed and this firstCellInBlock reference will only last
+      // a short while.
+      firstCellInBlock = cell;
+    }
+
+    // TODO: What if cell is 10MB and we write infrequently?  We'll hold on to the cell here
+    // indefinetly?
+    lastCell = cell;
+    entryCount++;
+    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (outputStream == null) {
+      return;
+    }
+    // Save data block encoder metadata in the file info.
+    blockEncoder.saveMetadata(this);
+    // Write out the end of the data blocks, then write meta data blocks.
+    // followed by fileinfo, data block index and meta block index.
+
+    finishBlock();
+    writeInlineBlocks(true);
+
+    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
+
+    // Write out the metadata blocks if any.
+    if (!metaNames.isEmpty()) {
+      for (int i = 0; i < metaNames.size(); ++i) {
+        // store the beginning offset
+        long offset = outputStream.getPos();
+        // write the metadata content
+        DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);
+        metaData.get(i).write(dos);
+
+        fsBlockWriter.writeHeaderAndData(outputStream);
+        totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
+
+        // Add the new meta block to the meta index.
+        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
+            fsBlockWriter.getOnDiskSizeWithHeader());
+      }
+    }
+
+    // Load-on-open section.
+
+    // Data block index.
+    //
+    // In version 2, this section of the file starts with the root level data
+    // block index. We call a function that writes intermediate-level blocks
+    // first, then root level, and returns the offset of the root level block
+    // index.
+
+    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
+    trailer.setLoadOnOpenOffset(rootIndexOffset);
+
+    // Meta block index.
+    metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(
+        BlockType.ROOT_INDEX), "meta");
+    fsBlockWriter.writeHeaderAndData(outputStream);
+    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
+
+    if (this.hFileContext.isIncludesMvcc()) {
+      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
+      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
+    }
+
+    // File info
+    writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));
+    fsBlockWriter.writeHeaderAndData(outputStream);
+    totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
+
+    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
+    for (BlockWritable w : additionalLoadOnOpenData){
+      fsBlockWriter.writeBlock(w, outputStream);
+      totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
+    }
+
+    // Now finish off the trailer.
+    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
+    trailer.setUncompressedDataIndexSize(
+        dataBlockIndexWriter.getTotalUncompressedSize());
+    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
+    trailer.setLastDataBlockOffset(lastDataBlockOffset);
+    trailer.setComparatorClass(comparator.getClass());
+    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
+
+
+    finishClose(trailer);
+
+    fsBlockWriter.release();
+  }
+
+  @Override
+  public void addInlineBlockWriter(InlineBlockWriter ibw) {
+    inlineBlockWriters.add(ibw);
+  }
+
+  @Override
+  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
+    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
+  }
+
+  @Override
+  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
+    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
+  }
+
+  private void addBloomFilter(final BloomFilterWriter bfw,
+      final BlockType blockType) {
+    if (bfw.getKeyCount() <= 0)
+      return;
+
+    if (blockType != BlockType.GENERAL_BLOOM_META &&
+        blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
+      throw new RuntimeException("Block Type: " + blockType.toString() +
+          "is not supported");
+    }
+    additionalLoadOnOpenData.add(new BlockWritable() {
+      @Override
+      public BlockType getBlockType() {
+        return blockType;
+      }
+
+      @Override
+      public void writeToBlock(DataOutput out) throws IOException {
+        bfw.getMetaWriter().write(out);
+        Writable dataWriter = bfw.getDataWriter();
+        if (dataWriter != null)
+          dataWriter.write(out);
+      }
+    });
+  }
+
+  protected int getMajorVersion() {
+    return 2;
+  }
+
+  protected int getMinorVersion() {
+    return HFileReaderV2.MAX_MINOR_VERSION;
+  }
+
+  @Override
+  public HFileContext getFileContext() {
+    return hFileContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
new file mode 100644
index 0000000..086395c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * {@link HFile} writer for version 3.
+ */
+@InterfaceAudience.Private
+public class HFileWriterV3 extends HFileWriterV2 {
+
+  private static final Log LOG = LogFactory.getLog(HFileWriterV3.class);
+
+  private int maxTagsLength = 0;
+
+  static class WriterFactoryV3 extends HFile.WriterFactory {
+    WriterFactoryV3(Configuration conf, CacheConfig cacheConf) {
+      super(conf, cacheConf);
+    }
+
+    @Override
+    public Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
+        final KVComparator comparator, HFileContext fileContext)
+        throws IOException {
+      return new HFileWriterV3(conf, cacheConf, fs, path, ostream, comparator, fileContext);
+    }
+  }
+
+  /** Constructor that takes a path, creates and closes the output stream. */
+  public HFileWriterV3(Configuration conf, CacheConfig cacheConf, FileSystem fs, Path path,
+      FSDataOutputStream ostream, final KVComparator comparator,
+      final HFileContext fileContext) throws IOException {
+    super(conf, cacheConf, fs, path, ostream, comparator, fileContext);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writer" + (path != null ? " for " + path : "") +
+        " initialized with cacheConf: " + cacheConf +
+        " comparator: " + comparator.getClass().getSimpleName() +
+        " fileContext: " + fileContext);
+    }
+  }
+
+  /**
+   * Add key/value to file. Keys must be added in an order that agrees with the
+   * Comparator passed on construction.
+   * 
+   * @param cell
+   *          Cell to add. Cannot be empty nor null.
+   * @throws IOException
+   */
+  @Override
+  public void append(final Cell cell) throws IOException {
+    // Currently get the complete arrays
+    super.append(cell);
+    int tagsLength = cell.getTagsLength();
+    if (tagsLength > this.maxTagsLength) {
+      this.maxTagsLength = tagsLength;
+    }
+  }
+
+  protected void finishFileInfo() throws IOException {
+    super.finishFileInfo();
+    if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {
+      // In case of Prefix Tree encoding, we always write tags information into HFiles even if all
+      // KVs are having no tags.
+      fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
+    } else if (hFileContext.isIncludesTags()) {
+      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
+      // from the FileInfo
+      fileInfo.append(FileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
+      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
+        && hFileContext.isCompressTags();
+      fileInfo.append(FileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
+    }
+  }
+
+  @Override
+  protected int getMajorVersion() {
+    return 3;
+  }
+
+  @Override
+  protected int getMinorVersion() {
+    return HFileReaderV3.MAX_MINOR_VERSION;
+  }
+
+  @Override
+  protected void finishClose(FixedFileTrailer trailer) throws IOException {
+    // Write out encryption metadata before finalizing if we have a valid crypto context
+    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
+    if (cryptoContext != Encryption.Context.NONE) {
+      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
+      // all information needed for decryption
+      trailer.setEncryptionKey(EncryptionUtil.wrapKey(cryptoContext.getConf(),
+        cryptoContext.getConf().get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
+          User.getCurrent().getShortName()),
+        cryptoContext.getKey()));
+    }
+    // Now we can finish the close
+    super.finishClose(trailer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 17465ea..465be56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -129,7 +129,7 @@ public class HFileOutputFormat2
     // Invented config.  Add to hbase-*.xml if other than default compression.
     final String defaultCompressionStr = conf.get("hfile.compression",
         Compression.Algorithm.NONE.getName());
-    final Algorithm defaultCompression = HFileWriterImpl
+    final Algorithm defaultCompression = AbstractHFileWriter
         .compressionByName(defaultCompressionStr);
     final boolean compactionExclude = conf.getBoolean(
         "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
@@ -245,7 +245,7 @@ public class HFileOutputFormat2
                                     .withBlockSize(blockSize);
         contextBuilder.withDataBlockEncoding(encoding);
         HFileContext hFileContext = contextBuilder.build();
-
+                                    
         wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
             .withOutputDir(familydir).withBloomType(bloomType)
             .withComparator(KeyValue.COMPARATOR)
@@ -358,7 +358,7 @@ public class HFileOutputFormat2
    * </ul>
    * The user should be sure to set the map output value class to either KeyValue or Put before
    * running this function.
-   *
+   * 
    * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
    */
   @Deprecated
@@ -448,7 +448,7 @@ public class HFileOutputFormat2
     TableMapReduceUtil.initCredentials(job);
     LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
   }
-
+  
   public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
     Configuration conf = job.getConfiguration();
 
@@ -483,7 +483,8 @@ public class HFileOutputFormat2
     Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
         Algorithm>(Bytes.BYTES_COMPARATOR);
     for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
-      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
+      Algorithm algorithm = AbstractHFileWriter.compressionByName
+          (e.getValue());
       compressionMap.put(e.getKey(), algorithm);
     }
     return compressionMap;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index de4b8c4..af457ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.http.InfoServer;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -491,7 +490,6 @@ public class HRegionServer extends HasThread implements
       throws IOException, InterruptedException {
     this.fsOk = true;
     this.conf = conf;
-    HFile.checkHFileVersion(this.conf);
     checkCodecs(this.conf);
     this.userProvider = UserProvider.instantiate(conf);
     FSUtils.setupShortCircuitRead(this.conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 9a33b64..8910042 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -588,7 +588,7 @@ public interface Region extends ConfigurationObserver {
       byte[] now) throws IOException;
 
   /**
-   * Replace any cell timestamps set to {@link org.apache.hadoop.hbase.HConstants#LATEST_TIMESTAMP}
+   * Replace any cell timestamps set to HConstants#LATEST_TIMESTAMP with the
    * provided current timestamp.
    * @param values
    * @param now

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 345dd9b..c1a6b76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -408,7 +409,7 @@ public class StoreFile {
     }
     this.reader.setSequenceID(this.sequenceid);
 
-    b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
+    b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
     if (b != null) {
       this.maxMemstoreTS = Bytes.toLong(b);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index ae820b5..3c3ea6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
@@ -142,7 +142,7 @@ public abstract class Compactor {
         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
       }
       else {
-        tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
+        tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
         if (tmp != null) {
           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
index 0058eaa..ba3cbe3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -120,7 +120,7 @@ public class CompressionTest {
   throws Exception {
     Configuration conf = HBaseConfiguration.create();
     HFileContext context = new HFileContextBuilder()
-                           .withCompression(HFileWriterImpl.compressionByName(codec)).build();
+                           .withCompression(AbstractHFileWriter.compressionByName(codec)).build();
     HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
         .withPath(fs, path)
         .withFileContext(context)

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
index cb12bea..ea10f60 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.io.crypto.aes.AES;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -326,7 +326,7 @@ public class HFilePerformanceEvaluation {
     void setUp() throws Exception {
 
       HFileContextBuilder builder = new HFileContextBuilder()
-          .withCompression(HFileWriterImpl.compressionByName(codec))
+          .withCompression(AbstractHFileWriter.compressionByName(codec))
           .withBlockSize(RFILE_BLOCKSIZE);
       
       if (cipher == "aes") {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 523a72a..a025e6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -241,6 +241,7 @@ public class TestCacheOnWrite {
   public void setUp() throws IOException {
     conf = TEST_UTIL.getConfiguration();
     this.conf.set("dfs.datanode.data.dir.perm", "700");
+    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
     conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
     conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
         BLOOM_BLOCK_SIZE);
@@ -270,7 +271,12 @@ public class TestCacheOnWrite {
   }
 
   private void readStoreFile(boolean useTags) throws IOException {
-    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf);
+    AbstractHFileReader reader;
+    if (useTags) {
+        reader = (HFileReaderV3) HFile.createReader(fs, storeFilePath, cacheConf, conf);
+    } else {
+        reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf);
+    }
     LOG.info("HFile information: " + reader);
     HFileContext meta = new HFileContextBuilder().withCompression(compress)
       .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
@@ -371,6 +377,11 @@ public class TestCacheOnWrite {
   }
 
   private void writeStoreFile(boolean useTags) throws IOException {
+    if(useTags) {
+      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    } else {
+      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
+    }
     Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
         "test_cache_on_write");
     HFileContext meta = new HFileContextBuilder().withCompression(compress)
@@ -410,6 +421,11 @@ public class TestCacheOnWrite {
 
   private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
       throws IOException, InterruptedException {
+    if (useTags) {
+      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    } else {
+      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2);
+    }
     // TODO: need to change this test if we add a cache size threshold for
     // compactions, or if we implement some other kind of intelligent logic for
     // deciding what blocks to cache-on-write on compaction.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
index 7d2fce4..29adb9c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestFixedFileTrailer.java
@@ -54,7 +54,7 @@ public class TestFixedFileTrailer {
   private static final int MAX_COMPARATOR_NAME_LENGTH = 128;
 
   /**
-   * The number of used fields by version. Indexed by version minus two.
+   * The number of used fields by version. Indexed by version minus two. 
    * Min version that we support is V2
    */
   private static final int[] NUM_FIELDS_BY_VERSION = new int[] { 14, 15 };
@@ -88,8 +88,8 @@ public class TestFixedFileTrailer {
 
   @Test
   public void testTrailer() throws IOException {
-    FixedFileTrailer t = new FixedFileTrailer(version,
-        HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION);
+    FixedFileTrailer t = new FixedFileTrailer(version, 
+        HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
     t.setDataIndexCount(3);
     t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
 
@@ -121,8 +121,8 @@ public class TestFixedFileTrailer {
     // Finished writing, trying to read.
     {
       DataInputStream dis = new DataInputStream(bais);
-      FixedFileTrailer t2 = new FixedFileTrailer(version,
-          HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION);
+      FixedFileTrailer t2 = new FixedFileTrailer(version, 
+          HFileReaderV2.PBUF_TRAILER_MINOR_VERSION);
       t2.deserialize(dis);
       assertEquals(-1, bais.read()); // Ensure we have read everything.
       checkLoadedTrailer(version, t, t2);
@@ -166,12 +166,12 @@ public class TestFixedFileTrailer {
         trailerStr.split(", ").length);
     assertEquals(trailerStr, t4.toString());
   }
-
+  
   @Test
   public void testTrailerForV2NonPBCompatibility() throws Exception {
     if (version == 2) {
       FixedFileTrailer t = new FixedFileTrailer(version,
-          HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM);
+          HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
       t.setDataIndexCount(3);
       t.setEntryCount(((long) Integer.MAX_VALUE) + 1);
       t.setLastDataBlockOffset(291);
@@ -198,7 +198,7 @@ public class TestFixedFileTrailer {
       {
         DataInputStream dis = new DataInputStream(bais);
         FixedFileTrailer t2 = new FixedFileTrailer(version,
-            HFileReaderImpl.MINOR_VERSION_NO_CHECKSUM);
+            HFileReaderV2.MINOR_VERSION_NO_CHECKSUM);
         t2.deserialize(dis);
         assertEquals(-1, bais.read()); // Ensure we have read everything.
         checkLoadedTrailer(version, t, t2);
@@ -227,7 +227,7 @@ public class TestFixedFileTrailer {
     output.writeInt(FixedFileTrailer.materializeVersion(fft.getMajorVersion(),
         fft.getMinorVersion()));
   }
-
+ 
 
   private FixedFileTrailer readTrailer(Path trailerPath) throws IOException {
     FSDataInputStream fsdis = fs.open(trailerPath);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
index b91cdd2..64f46c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
@@ -81,6 +81,8 @@ public class TestForceCacheImportantBlocks {
   public static Collection<Object[]> parameters() {
     // HFile versions
     return Arrays.asList(
+      new Object[] { 2, true },
+      new Object[] { 2, false },
       new Object[] { 3, true },
       new Object[] { 3, false }
     );

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index f325451..d198274 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -239,7 +239,7 @@ public class TestHFile extends HBaseTestCase {
     FSDataOutputStream fout = createFSOutput(ncTFile);
     HFileContext meta = new HFileContextBuilder()
                         .withBlockSize(minBlockSize)
-                        .withCompression(HFileWriterImpl.compressionByName(codec))
+                        .withCompression(AbstractHFileWriter.compressionByName(codec))
                         .build();
     Writer writer = HFile.getWriterFactory(conf, cacheConf)
         .withOutputStream(fout)
@@ -330,7 +330,7 @@ public class TestHFile extends HBaseTestCase {
     Path mFile = new Path(ROOT_DIR, "meta.hfile");
     FSDataOutputStream fout = createFSOutput(mFile);
     HFileContext meta = new HFileContextBuilder()
-                        .withCompression(HFileWriterImpl.compressionByName(compress))
+                        .withCompression(AbstractHFileWriter.compressionByName(compress))
                         .withBlockSize(minBlockSize).build();
     Writer writer = HFile.getWriterFactory(conf, cacheConf)
         .withOutputStream(fout)

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index dc8c2fc..7c9c45a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -589,7 +589,7 @@ public class TestHFileBlockIndex {
       }
 
       // Manually compute the mid-key and validate it.
-      HFile.Reader reader2 = reader;
+      HFileReaderV2 reader2 = (HFileReaderV2) reader;
       HFileBlock.FSReader fsReader = reader2.getUncachedBlockReader();
 
       HFileBlock.BlockIterator iter = fsReader.blockRange(0,

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
index 5b3e712..d07a6de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
@@ -54,7 +54,8 @@ public class TestHFileInlineToRootChunkConversion {
     CacheConfig cacheConf = new CacheConfig(conf);
     conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, maxChunkSize);
     HFileContext context = new HFileContextBuilder().withBlockSize(16).build();
-    HFile.Writer hfw = new HFileWriterFactory(conf, cacheConf)
+    HFileWriterV2 hfw =
+        (HFileWriterV2) new HFileWriterV2.WriterFactoryV2(conf, cacheConf)
             .withFileContext(context)
             .withPath(fs, hfPath).create();
     List<byte[]> keys = new ArrayList<byte[]>();
@@ -76,7 +77,7 @@ public class TestHFileInlineToRootChunkConversion {
     }
     hfw.close();
 
-    HFile.Reader reader = HFile.createReader(fs, hfPath, cacheConf, conf);
+    HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs, hfPath, cacheConf, conf);
     // Scanner doesn't do Cells yet.  Fix.
     HFileScanner scanner = reader.getScanner(true, true);
     for (int i = 0; i < keys.size(); ++i) {
@@ -84,4 +85,4 @@ public class TestHFileInlineToRootChunkConversion {
     }
     reader.close();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
index accf7ad..e502eb6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
@@ -129,7 +129,7 @@ public class TestHFileSeek extends TestCase {
     try {
       HFileContext context = new HFileContextBuilder()
                             .withBlockSize(options.minBlockSize)
-                            .withCompression(HFileWriterImpl.compressionByName(options.compress))
+                            .withCompression(AbstractHFileWriter.compressionByName(options.compress))
                             .build();
       Writer writer = HFile.getWriterFactoryNoCache(conf)
           .withOutputStream(fout)

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
index 9155fd6..bdf2ecc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
@@ -55,7 +55,7 @@ import org.junit.experimental.categories.Category;
 
 /**
  * Testing writing a version 2 {@link HFile}. This is a low-level test written
- * during the development of {@link HFileWriterImpl}.
+ * during the development of {@link HFileWriterV2}.
  */
 @Category(SmallTests.class)
 public class TestHFileWriterV2 {
@@ -98,7 +98,8 @@ public class TestHFileWriterV2 {
                            .withBlockSize(4096)
                            .withCompression(compressAlgo)
                            .build();
-    HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf))
+    HFileWriterV2 writer = (HFileWriterV2)
+        new HFileWriterV2.WriterFactoryV2(conf, new CacheConfig(conf))
             .withPath(fs, hfilePath)
             .withFileContext(context)
             .create();
@@ -134,6 +135,7 @@ public class TestHFileWriterV2 {
     FixedFileTrailer trailer =
         FixedFileTrailer.readFromStream(fsdis, fileSize);
 
+    assertEquals(2, trailer.getMajorVersion());
     assertEquals(entryCount, trailer.getEntryCount());
 
     HFileContext meta = new HFileContextBuilder()
@@ -174,7 +176,8 @@ public class TestHFileWriterV2 {
     // File info
     FileInfo fileInfo = new FileInfo();
     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
-    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
+    byte [] keyValueFormatVersion = fileInfo.get(
+        HFileWriterV2.KEY_VALUE_VERSION);
     boolean includeMemstoreTS = keyValueFormatVersion != null &&
         Bytes.toInt(keyValueFormatVersion) > 0;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index 627829f..76da924 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -59,7 +59,8 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 /**
- * Testing writing a version 3 {@link HFile}.
+ * Testing writing a version 3 {@link HFile}. This is a low-level test written
+ * during the development of {@link HFileWriterV3}.
  */
 @RunWith(Parameterized.class)
 @Category(SmallTests.class)
@@ -118,7 +119,8 @@ public class TestHFileWriterV3 {
                            .withBlockSize(4096)
                            .withIncludesTags(useTags)
                            .withCompression(compressAlgo).build();
-    HFile.Writer writer = new HFileWriterFactory(conf, new CacheConfig(conf))
+    HFileWriterV3 writer = (HFileWriterV3)
+        new HFileWriterV3.WriterFactoryV3(conf, new CacheConfig(conf))
             .withPath(fs, hfilePath)
             .withFileContext(context)
             .withComparator(KeyValue.COMPARATOR)
@@ -203,7 +205,8 @@ public class TestHFileWriterV3 {
     // File info
     FileInfo fileInfo = new FileInfo();
     fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
-    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
+    byte [] keyValueFormatVersion = fileInfo.get(
+        HFileWriterV3.KEY_VALUE_VERSION);
     boolean includeMemstoreTS = keyValueFormatVersion != null &&
         Bytes.toInt(keyValueFormatVersion) > 0;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
index 731bb33..b7f7fa1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
@@ -88,7 +88,8 @@ public class TestLazyDataBlockDecompression {
    */
   private static void writeHFile(Configuration conf, CacheConfig cc, FileSystem fs, Path path,
       HFileContext cxt, int entryCount) throws IOException {
-    HFile.Writer writer = new HFileWriterFactory(conf, cc)
+    HFileWriterV2 writer = (HFileWriterV2)
+      new HFileWriterV2.WriterFactoryV2(conf, cc)
         .withPath(fs, path)
         .withFileContext(cxt)
         .create();
@@ -116,7 +117,7 @@ public class TestLazyDataBlockDecompression {
     long fileSize = fs.getFileStatus(path).getLen();
     FixedFileTrailer trailer =
       FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
-    HFile.Reader reader = new HFileReaderImpl(path, trailer, fsdis, fileSize, cacheConfig,
+    HFileReaderV2 reader = new HFileReaderV2(path, trailer, fsdis, fileSize, cacheConfig,
       fsdis.getHfs(), conf);
     reader.loadFileInfo();
     long offset = trailer.getFirstDataBlockOffset(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 0e05180..a70e5ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -54,6 +54,7 @@ public class TestPrefetch {
   @Before
   public void setUp() throws IOException {
     conf = TEST_UTIL.getConfiguration();
+    conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
     conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
     fs = HFileSystem.get(conf);
     CacheConfig.blockCacheDisabled = false;
@@ -68,9 +69,10 @@ public class TestPrefetch {
 
   private void readStoreFile(Path storeFilePath) throws Exception {
     // Open the file
-    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, conf);
+    HFileReaderV2 reader = (HFileReaderV2) HFile.createReader(fs,
+      storeFilePath, cacheConf, conf);
 
-    while (!reader.prefetchComplete()) {
+    while (!((HFileReaderV3)reader).prefetchComplete()) {
       // Sleep for a bit
       Thread.sleep(1000);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
index 0947ecb..54801e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
@@ -36,7 +36,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
- * Test {@link HFileScanner#reseekTo(org.apache.hadoop.hbase.Cell)}
+ * Test {@link HFileScanner#reseekTo(byte[])}
  */
 @Category(SmallTests.class)
 public class TestReseekTo {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
index e810529..a642e8d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestSeekTo.java
@@ -70,7 +70,12 @@ public class TestSeekTo extends HBaseTestCase {
   }
 
   Path makeNewFile(TagUsage tagUsage) throws IOException {
-    Path ncTFile = new Path(testDir, "basic.hfile");
+    Path ncTFile = new Path(this.testDir, "basic.hfile");
+    if (tagUsage != TagUsage.NO_TAG) {
+      conf.setInt("hfile.format.version", 3);
+    } else {
+      conf.setInt("hfile.format.version", 2);
+    }
     FSDataOutputStream fout = this.fs.create(ncTFile);
     int blocksize = toKV("a", tagUsage).getLength() * 3;
     HFileContext context = new HFileContextBuilder().withBlockSize(blocksize)
@@ -133,7 +138,7 @@ public class TestSeekTo extends HBaseTestCase {
   }
 
   public void testSeekBeforeWithReSeekTo() throws Exception {
-    testSeekBeforeInternals(TagUsage.NO_TAG);
+    testSeekBeforeWithReSeekToInternals(TagUsage.NO_TAG);
     testSeekBeforeWithReSeekToInternals(TagUsage.ONLY_TAG);
     testSeekBeforeWithReSeekToInternals(TagUsage.PARTIAL_TAG);
   }
@@ -222,7 +227,7 @@ public class TestSeekTo extends HBaseTestCase {
   }
 
   public void testSeekTo() throws Exception {
-    testSeekBeforeInternals(TagUsage.NO_TAG);
+    testSeekToInternals(TagUsage.NO_TAG);
     testSeekToInternals(TagUsage.ONLY_TAG);
     testSeekToInternals(TagUsage.PARTIAL_TAG);
   }
@@ -250,7 +255,7 @@ public class TestSeekTo extends HBaseTestCase {
     reader.close();
   }
   public void testBlockContainingKey() throws Exception {
-    testSeekBeforeInternals(TagUsage.NO_TAG);
+    testBlockContainingKeyInternals(TagUsage.NO_TAG);
     testBlockContainingKeyInternals(TagUsage.ONLY_TAG);
     testBlockContainingKeyInternals(TagUsage.PARTIAL_TAG);
   }
@@ -259,7 +264,7 @@ public class TestSeekTo extends HBaseTestCase {
     Path p = makeNewFile(tagUsage);
     HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
     reader.loadFileInfo();
-    HFileBlockIndex.BlockIndexReader blockIndexReader =
+    HFileBlockIndex.BlockIndexReader blockIndexReader = 
       reader.getDataBlockIndexReader();
     System.out.println(blockIndexReader.toString());
     // falls before the start of the file.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
index 6544c72..1927334 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
+import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
@@ -602,9 +602,8 @@ public class DataBlockEncodingTool {
     // run the utilities
     DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
     int majorVersion = reader.getHFileVersion();
-    comp.useHBaseChecksum = majorVersion > 2 ||
-      (majorVersion == 2 &&
-       reader.getHFileMinorVersion() >= HFileReaderImpl.MINOR_VERSION_WITH_CHECKSUM);
+    comp.useHBaseChecksum = majorVersion > 2
+        || (majorVersion == 2 && reader.getHFileMinorVersion() >= HFileReaderV2.MINOR_VERSION_WITH_CHECKSUM);
     comp.checkStatistics(scanner, kvLimit);
     if (doVerify) {
       comp.verifyCodecs(scanner, kvLimit);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
index 576e323..9a227ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
@@ -219,7 +220,7 @@ public class TestCacheOnWriteInSchema {
     BlockCache cache = cacheConf.getBlockCache();
     StoreFile sf = new StoreFile(fs, path, conf, cacheConf,
       BloomType.ROWCOL);
-    HFile.Reader reader = sf.createReader().getHFileReader();
+    HFileReaderV2 reader = (HFileReaderV2) sf.createReader().getHFileReader();
     try {
       // Open a scanner with (on read) caching disabled
       HFileScanner scanner = reader.getScanner(false, false);


[2/4] hbase git commit: Revert "HBASE-13373 Squash HFileReaderV3 together with HFileReaderV2 and AbstractHFileReader; ditto for Scanners and BlockReader, etc.:" Revert because breaking migration tests.

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
new file mode 100644
index 0000000..c0e3e91
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -0,0 +1,1323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link HFile} reader for version 2.
+ */
+@InterfaceAudience.Private
+public class HFileReaderV2 extends AbstractHFileReader {
+
+  private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
+
+  /** Minor versions in HFile V2 starting with this number have hbase checksums */
+  public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
+  /** In HFile V2 minor version that does not support checksums */
+  public static final int MINOR_VERSION_NO_CHECKSUM = 0;
+
+  /** HFile minor version that introduced pbuf filetrailer */
+  public static final int PBUF_TRAILER_MINOR_VERSION = 2;
+
+  /**
+   * The size of a (key length, value length) tuple that prefixes each entry in
+   * a data block.
+   */
+  public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
+
+  protected boolean includesMemstoreTS = false;
+  protected boolean decodeMemstoreTS = false;
+  protected boolean shouldIncludeMemstoreTS() {
+    return includesMemstoreTS;
+  }
+
+  /** Filesystem-level block reader. */
+  protected HFileBlock.FSReader fsBlockReader;
+
+  /**
+   * A "sparse lock" implementation allowing to lock on a particular block
+   * identified by offset. The purpose of this is to avoid two clients loading
+   * the same block, and have all but one client wait to get the block from the
+   * cache.
+   */
+  private IdLock offsetLock = new IdLock();
+
+  /**
+   * Blocks read from the load-on-open section, excluding data root index, meta
+   * index, and file info.
+   */
+  private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
+
+  /** Minimum minor version supported by this HFile format */
+  static final int MIN_MINOR_VERSION = 0;
+
+  /** Maximum minor version supported by this HFile format */
+  // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
+  // the file. This version can read Writables version 1.
+  static final int MAX_MINOR_VERSION = 3;
+
+  /** Minor versions starting with this number have faked index key */
+  static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
+
+  protected HFileContext hfileContext;
+
+  /**
+   * Opens a HFile. You must load the index before you can use it by calling
+   * {@link #loadFileInfo()}.
+   *
+   * @param path Path to HFile.
+   * @param trailer File trailer.
+   * @param fsdis input stream.
+   * @param size Length of the stream.
+   * @param cacheConf Cache configuration.
+   * @param hfs
+   * @param conf
+   */
+  public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
+      final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
+      final HFileSystem hfs, final Configuration conf) throws IOException {
+    super(path, trailer, size, cacheConf, hfs, conf);
+    this.conf = conf;
+    trailer.expectMajorVersion(getMajorVersion());
+    validateMinorVersion(path, trailer.getMinorVersion());
+    this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
+    HFileBlock.FSReaderImpl fsBlockReaderV2 =
+      new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
+    this.fsBlockReader = fsBlockReaderV2; // upcast
+
+    // Comparator class name is stored in the trailer in version 2.
+    comparator = trailer.createComparator();
+    dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
+        trailer.getNumDataIndexLevels(), this);
+    metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
+        KeyValue.RAW_COMPARATOR, 1);
+
+    // Parse load-on-open data.
+
+    HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
+        trailer.getLoadOnOpenDataOffset(),
+        fileSize - trailer.getTrailerSize());
+
+    // Data index. We also read statistics about the block index written after
+    // the root level.
+    dataBlockIndexReader.readMultiLevelIndexRoot(
+        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
+        trailer.getDataIndexCount());
+
+    // Meta index.
+    metaBlockIndexReader.readRootIndex(
+        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
+        trailer.getMetaIndexCount());
+
+    // File info
+    fileInfo = new FileInfo();
+    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
+    byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
+    this.hfileContext.setFileCreateTime(creationTimeBytes == null ? 0 : Bytes.toLong(creationTimeBytes));
+    lastKey = fileInfo.get(FileInfo.LASTKEY);
+    avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
+    avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
+    byte [] keyValueFormatVersion =
+        fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
+    includesMemstoreTS = keyValueFormatVersion != null &&
+        Bytes.toInt(keyValueFormatVersion) ==
+            HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
+    fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
+    if (includesMemstoreTS) {
+      decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
+    }
+
+    // Read data block encoding algorithm name from file info.
+    dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
+    fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
+
+    // Store all other load-on-open blocks for further consumption.
+    HFileBlock b;
+    while ((b = blockIter.nextBlock()) != null) {
+      loadOnOpenBlocks.add(b);
+    }
+
+    // Prefetch file blocks upon open if requested
+    if (cacheConf.shouldPrefetchOnOpen()) {
+      PrefetchExecutor.request(path, new Runnable() {
+        public void run() {
+          try {
+            long offset = 0;
+            long end = fileSize - getTrailer().getTrailerSize();
+            HFileBlock prevBlock = null;
+            while (offset < end) {
+              if (Thread.interrupted()) {
+                break;
+              }
+              long onDiskSize = -1;
+              if (prevBlock != null) {
+                onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
+              }
+              HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
+                null, null);
+              prevBlock = block;
+              offset += block.getOnDiskSizeWithHeader();
+            }
+          } catch (IOException e) {
+            // IOExceptions are probably due to region closes (relocation, etc.)
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Exception encountered while prefetching " + path + ":", e);
+            }
+          } catch (Exception e) {
+            // Other exceptions are interesting
+            LOG.warn("Exception encountered while prefetching " + path + ":", e);
+          } finally {
+            PrefetchExecutor.complete(path);
+          }
+        }
+      });
+    }
+  }
+
+  protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
+      HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
+    return new HFileContextBuilder()
+      .withIncludesMvcc(this.includesMemstoreTS)
+      .withCompression(this.compressAlgo)
+      .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
+      .build();
+  }
+
+  /**
+   * Create a Scanner on this file. No seeks or reads are done on creation. Call
+   * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+   * nothing to clean up in a Scanner. Letting go of your references to the
+   * scanner is sufficient.
+   *
+   * @param cacheBlocks True if we should cache blocks read in by this scanner.
+   * @param pread Use positional read rather than seek+read if true (pread is
+   *          better for random reads, seek+read is better scanning).
+   * @param isCompaction is scanner being used for a compaction?
+   * @return Scanner on this file.
+   */
+   @Override
+   public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+      final boolean isCompaction) {
+    if (dataBlockEncoder.useEncodedScanner()) {
+      return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
+          hfileContext);
+    }
+
+    return new ScannerV2(this, cacheBlocks, pread, isCompaction);
+  }
+
+  /**
+   * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType}
+   * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
+   */
+   private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
+       boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
+       DataBlockEncoding expectedDataBlockEncoding) throws IOException {
+     // Check cache for block. If found return.
+     if (cacheConf.isBlockCacheEnabled()) {
+       BlockCache cache = cacheConf.getBlockCache();
+       HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
+         updateCacheMetrics);
+       if (cachedBlock != null) {
+         if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
+           cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
+         }
+         validateBlockType(cachedBlock, expectedBlockType);
+
+         if (expectedDataBlockEncoding == null) {
+           return cachedBlock;
+         }
+         DataBlockEncoding actualDataBlockEncoding =
+                 cachedBlock.getDataBlockEncoding();
+         // Block types other than data blocks always have
+         // DataBlockEncoding.NONE. To avoid false negative cache misses, only
+         // perform this check if cached block is a data block.
+         if (cachedBlock.getBlockType().isData() &&
+                 !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) {
+           // This mismatch may happen if a ScannerV2, which is used for say a
+           // compaction, tries to read an encoded block from the block cache.
+           // The reverse might happen when an EncodedScannerV2 tries to read
+           // un-encoded blocks which were cached earlier.
+           //
+           // Because returning a data block with an implicit BlockType mismatch
+           // will cause the requesting scanner to throw a disk read should be
+           // forced here. This will potentially cause a significant number of
+           // cache misses, so update so we should keep track of this as it might
+           // justify the work on a CompoundScannerV2.
+           if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) &&
+                   !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) {
+             // If the block is encoded but the encoding does not match the
+             // expected encoding it is likely the encoding was changed but the
+             // block was not yet evicted. Evictions on file close happen async
+             // so blocks with the old encoding still linger in cache for some
+             // period of time. This event should be rare as it only happens on
+             // schema definition change.
+             LOG.info("Evicting cached block with key " + cacheKey +
+                     " because of a data block encoding mismatch" +
+                     "; expected: " + expectedDataBlockEncoding +
+                     ", actual: " + actualDataBlockEncoding);
+             cache.evictBlock(cacheKey);
+           }
+           return null;
+         }
+         return cachedBlock;
+       }
+     }
+     return null;
+   }
+  /**
+   * @param metaBlockName
+   * @param cacheBlock Add block to cache, if found
+   * @return block wrapped in a ByteBuffer, with header skipped
+   * @throws IOException
+   */
+  @Override
+  public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
+      throws IOException {
+    if (trailer.getMetaIndexCount() == 0) {
+      return null; // there are no meta blocks
+    }
+    if (metaBlockIndexReader == null) {
+      throw new IOException("Meta index not loaded");
+    }
+
+    byte[] mbname = Bytes.toBytes(metaBlockName);
+    int block = metaBlockIndexReader.rootBlockContainingKey(mbname,
+        0, mbname.length);
+    if (block == -1)
+      return null;
+    long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
+
+    // Per meta key from any given file, synchronize reads for said block. This
+    // is OK to do for meta blocks because the meta block index is always
+    // single-level.
+    synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
+      // Check cache for block. If found return.
+      long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
+      BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset);
+
+      cacheBlock &= cacheConf.shouldCacheDataOnRead();
+      if (cacheConf.isBlockCacheEnabled()) {
+        HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
+          BlockType.META, null);
+        if (cachedBlock != null) {
+          assert cachedBlock.isUnpacked() : "Packed block leak.";
+          // Return a distinct 'shallow copy' of the block,
+          // so pos does not get messed by the scanner
+          return cachedBlock.getBufferWithoutHeader();
+        }
+        // Cache Miss, please load.
+      }
+
+      HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
+          blockSize, -1, true).unpack(hfileContext, fsBlockReader);
+
+      // Cache the block
+      if (cacheBlock) {
+        cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
+            cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
+      }
+
+      return metaBlock.getBufferWithoutHeader();
+    }
+  }
+
+  @Override
+  public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
+      final boolean cacheBlock, boolean pread, final boolean isCompaction,
+      boolean updateCacheMetrics, BlockType expectedBlockType,
+      DataBlockEncoding expectedDataBlockEncoding)
+      throws IOException {
+    if (dataBlockIndexReader == null) {
+      throw new IOException("Block index not loaded");
+    }
+    if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
+      throw new IOException("Requested block is out of range: " + dataBlockOffset +
+        ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset());
+    }
+
+    // For any given block from any given file, synchronize reads for said block.
+    // Without a cache, this synchronizing is needless overhead, but really
+    // the other choice is to duplicate work (which the cache would prevent you
+    // from doing).
+    BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset);
+    boolean useLock = false;
+    IdLock.Entry lockEntry = null;
+    TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
+    try {
+      while (true) {
+        if (useLock) {
+          lockEntry = offsetLock.getLockEntry(dataBlockOffset);
+        }
+
+        // Check cache for block. If found return.
+        if (cacheConf.isBlockCacheEnabled()) {
+          // Try and get the block from the block cache. If the useLock variable is true then this
+          // is the second time through the loop and it should not be counted as a block cache miss.
+          HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
+            updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
+          if (cachedBlock != null) {
+            if (Trace.isTracing()) {
+              traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
+            }
+            assert cachedBlock.isUnpacked() : "Packed block leak.";
+            if (cachedBlock.getBlockType().isData()) {
+              if (updateCacheMetrics) {
+                HFile.dataBlockReadCnt.incrementAndGet();
+              }
+              // Validate encoding type for data blocks. We include encoding
+              // type in the cache key, and we expect it to match on a cache hit.
+              if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
+                throw new IOException("Cached block under key " + cacheKey + " "
+                  + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+                  + dataBlockEncoder.getDataBlockEncoding() + ")");
+              }
+            }
+            // Cache-hit. Return!
+            return cachedBlock;
+          }
+          // Carry on, please load.
+        }
+        if (!useLock) {
+          // check cache again with lock
+          useLock = true;
+          continue;
+        }
+        if (Trace.isTracing()) {
+          traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
+        }
+        // Load block from filesystem.
+        HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
+            pread);
+        validateBlockType(hfileBlock, expectedBlockType);
+        HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
+        BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
+
+        // Cache the block if necessary
+        if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+          cacheConf.getBlockCache().cacheBlock(cacheKey,
+            cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
+            cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
+        }
+
+        if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
+          HFile.dataBlockReadCnt.incrementAndGet();
+        }
+
+        return unpacked;
+      }
+    } finally {
+      traceScope.close();
+      if (lockEntry != null) {
+        offsetLock.releaseLockEntry(lockEntry);
+      }
+    }
+  }
+
+  @Override
+  public boolean hasMVCCInfo() {
+    return includesMemstoreTS && decodeMemstoreTS;
+  }
+
+  /**
+   * Compares the actual type of a block retrieved from cache or disk with its
+   * expected type and throws an exception in case of a mismatch. Expected
+   * block type of {@link BlockType#DATA} is considered to match the actual
+   * block type [@link {@link BlockType#ENCODED_DATA} as well.
+   * @param block a block retrieved from cache or disk
+   * @param expectedBlockType the expected block type, or null to skip the
+   *          check
+   */
+  private void validateBlockType(HFileBlock block,
+      BlockType expectedBlockType) throws IOException {
+    if (expectedBlockType == null) {
+      return;
+    }
+    BlockType actualBlockType = block.getBlockType();
+    if (expectedBlockType.isData() && actualBlockType.isData()) {
+      // We consider DATA to match ENCODED_DATA for the purpose of this
+      // verification.
+      return;
+    }
+    if (actualBlockType != expectedBlockType) {
+      throw new IOException("Expected block type " + expectedBlockType + ", " +
+          "but got " + actualBlockType + ": " + block);
+    }
+  }
+
+  /**
+   * @return Last key in the file. May be null if file has no entries. Note that
+   *         this is not the last row key, but rather the byte form of the last
+   *         KeyValue.
+   */
+  @Override
+  public byte[] getLastKey() {
+    return dataBlockIndexReader.isEmpty() ? null : lastKey;
+  }
+
+  /**
+   * @return Midkey for this file. We work with block boundaries only so
+   *         returned midkey is an approximation only.
+   * @throws IOException
+   */
+  @Override
+  public byte[] midkey() throws IOException {
+    return dataBlockIndexReader.midkey();
+  }
+
+  @Override
+  public void close() throws IOException {
+    close(cacheConf.shouldEvictOnClose());
+  }
+
+  public void close(boolean evictOnClose) throws IOException {
+    PrefetchExecutor.cancel(path);
+    if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
+      int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("On close, file=" + name + " evicted=" + numEvicted
+          + " block(s)");
+      }
+    }
+    fsBlockReader.closeStreams();
+  }
+
+  public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
+    return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction);
+  }
+
+  /** For testing */
+  @Override
+  HFileBlock.FSReader getUncachedBlockReader() {
+    return fsBlockReader;
+  }
+
+
+  protected abstract static class AbstractScannerV2
+      extends AbstractHFileReader.Scanner {
+    protected HFileBlock block;
+
+    @Override
+    public Cell getNextIndexedKey() {
+      return nextIndexedKey;
+    }
+    /**
+     * The next indexed key is to keep track of the indexed key of the next data block.
+     * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
+     * current data block is the last data block.
+     *
+     * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
+     */
+    protected Cell nextIndexedKey;
+
+    public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      super(r, cacheBlocks, pread, isCompaction);
+    }
+
+    protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
+
+    protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
+        boolean rewind, Cell key, boolean seekBefore) throws IOException;
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      // Always rewind to the first key of the block, because the given key
+      // might be before or after the current key.
+      return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length) throws IOException {
+      return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
+    }
+
+    @Override
+    public int seekTo(Cell key) throws IOException {
+      return seekTo(key, true);
+    }
+
+    @Override
+    public int reseekTo(Cell key) throws IOException {
+      int compared;
+      if (isSeeked()) {
+        compared = compareKey(reader.getComparator(), key);
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        } else {
+          // The comparison with no_next_index_key has to be checked
+          if (this.nextIndexedKey != null &&
+              (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader
+              .getComparator()
+                  .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
+            // The reader shall continue to scan the current data block instead
+            // of querying the
+            // block index as long as it knows the target key is strictly
+            // smaller than
+            // the next indexed key or the current data block is the last data
+            // block.
+            return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
+          }
+        }
+      }
+      // Don't rewind on a reseek operation, because reseek implies that we are
+      // always going forward in the file.
+      return seekTo(key, false);
+    }
+
+
+    /**
+     * An internal API function. Seek to the given key, optionally rewinding to
+     * the first key of the block before doing the seek.
+     *
+     * @param key - a cell representing the key that we need to fetch
+     * @param rewind whether to rewind to the first key of the block before
+     *        doing the seek. If this is false, we are assuming we never go
+     *        back, otherwise the result is undefined.
+     * @return -1 if the key is earlier than the first key of the file,
+     *         0 if we are at the given key, 1 if we are past the given key
+     *         -2 if the key is earlier than the first key of the file while
+     *         using a faked index key
+     * @throws IOException
+     */
+    public int seekTo(Cell key, boolean rewind) throws IOException {
+      HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
+      BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
+          cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
+      if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
+        // This happens if the key e.g. falls before the beginning of the file.
+        return -1;
+      }
+      return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
+          blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
+      return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
+    }
+
+    @Override
+    public boolean seekBefore(Cell key) throws IOException {
+      HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
+          cacheBlocks, pread, isCompaction,
+          ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction));
+      if (seekToBlock == null) {
+        return false;
+      }
+      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+
+      if (reader.getComparator()
+          .compareOnlyKeyPortion(
+              new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
+                  firstKey.limit()), key) >= 0) {
+        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+        // The key we are interested in
+        if (previousBlockOffset == -1) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+
+        // It is important that we compute and pass onDiskSize to the block
+        // reader so that it does not have to read the header separately to
+        // figure out the size.
+        seekToBlock = reader.readBlock(previousBlockOffset,
+            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
+            pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
+      loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
+      return true;
+    }
+
+    /**
+     * Scans blocks in the "scanned" section of the {@link HFile} until the next
+     * data block is found.
+     *
+     * @return the next block, or null if there are no more data blocks
+     * @throws IOException
+     */
+    protected HFileBlock readNextDataBlock() throws IOException {
+      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+      if (block == null)
+        return null;
+
+      HFileBlock curBlock = block;
+
+      do {
+        if (curBlock.getOffset() >= lastDataBlockOffset)
+          return null;
+
+        if (curBlock.getOffset() < 0) {
+          throw new IOException("Invalid block file offset: " + block);
+        }
+
+        // We are reading the next block without block type validation, because
+        // it might turn out to be a non-data block.
+        curBlock = reader.readBlock(curBlock.getOffset()
+            + curBlock.getOnDiskSizeWithHeader(),
+            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+            isCompaction, true, null, getEffectiveDataBlockEncoding());
+      } while (!curBlock.getBlockType().isData());
+
+      return curBlock;
+    }
+
+    public DataBlockEncoding getEffectiveDataBlockEncoding() {
+      return ((HFileReaderV2)reader).getEffectiveEncodingInCache(isCompaction);
+    }
+    /**
+     * Compare the given key against the current key
+     * @param comparator
+     * @param key
+     * @param offset
+     * @param length
+     * @return -1 is the passed key is smaller than the current key, 0 if equal and 1 if greater
+     */
+    public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
+        int length);
+
+    public abstract int compareKey(KVComparator comparator, Cell kv);
+  }
+
+  /**
+   * Implementation of {@link HFileScanner} interface.
+   */
+  protected static class ScannerV2 extends AbstractScannerV2 {
+    private HFileReaderV2 reader;
+
+    public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      super(r, cacheBlocks, pread, isCompaction);
+      this.reader = r;
+    }
+
+    @Override
+    public Cell getKeyValue() {
+      if (!isSeeked())
+        return null;
+
+      KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position(), getCellBufSize());
+      if (this.reader.shouldIncludeMemstoreTS()) {
+        ret.setSequenceId(currMemstoreTS);
+      }
+      return ret;
+    }
+
+    protected int getCellBufSize() {
+      return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      assertSeeked();
+      return ByteBuffer.wrap(
+          blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
+    }
+
+    @Override
+    public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
+      return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      assertSeeked();
+      return ByteBuffer.wrap(
+          blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
+    }
+
+    protected void setNonSeekedState() {
+      block = null;
+      blockBuffer = null;
+      currKeyLen = 0;
+      currValueLen = 0;
+      currMemstoreTS = 0;
+      currMemstoreTSLen = 0;
+    }
+
+    /**
+     * Go to the next key/value in the block section. Loads the next block if
+     * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
+     * be called.
+     *
+     * @return true if successfully navigated to the next key/value
+     */
+    @Override
+    public boolean next() throws IOException {
+      assertSeeked();
+
+      try {
+        blockBuffer.position(getNextCellStartPosition());
+      } catch (IllegalArgumentException e) {
+        LOG.error("Current pos = " + blockBuffer.position()
+            + "; currKeyLen = " + currKeyLen + "; currValLen = "
+            + currValueLen + "; block limit = " + blockBuffer.limit()
+            + "; HFile name = " + reader.getName()
+            + "; currBlock currBlockOffset = " + block.getOffset());
+        throw e;
+      }
+
+      if (blockBuffer.remaining() <= 0) {
+        long lastDataBlockOffset =
+            reader.getTrailer().getLastDataBlockOffset();
+
+        if (block.getOffset() >= lastDataBlockOffset) {
+          setNonSeekedState();
+          return false;
+        }
+
+        // read the next block
+        HFileBlock nextBlock = readNextDataBlock();
+        if (nextBlock == null) {
+          setNonSeekedState();
+          return false;
+        }
+
+        updateCurrBlock(nextBlock);
+        return true;
+      }
+
+      // We are still in the same block.
+      readKeyValueLen();
+      return true;
+    }
+
+    protected int getNextCellStartPosition() {
+      return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
+          + currMemstoreTSLen;
+    }
+
+    /**
+     * Positions this scanner at the start of the file.
+     *
+     * @return false if empty file; i.e. a call to next would return false and
+     *         the current key and value are undefined.
+     * @throws IOException
+     */
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader == null) {
+        return false;
+      }
+
+      if (reader.getTrailer().getEntryCount() == 0) {
+        // No data blocks.
+        return false;
+      }
+
+      long firstDataBlockOffset =
+          reader.getTrailer().getFirstDataBlockOffset();
+      if (block != null && block.getOffset() == firstDataBlockOffset) {
+        blockBuffer.rewind();
+        readKeyValueLen();
+        return true;
+      }
+
+      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+          isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
+      if (block.getOffset() < 0) {
+        throw new IOException("Invalid block offset: " + block.getOffset());
+      }
+      updateCurrBlock(block);
+      return true;
+    }
+
+    @Override
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
+        boolean rewind, Cell key, boolean seekBefore) throws IOException {
+      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+        updateCurrBlock(seekToBlock);
+      } else if (rewind) {
+        blockBuffer.rewind();
+      }
+
+      // Update the nextIndexedKey
+      this.nextIndexedKey = nextIndexedKey;
+      return blockSeek(key, seekBefore);
+    }
+
+    /**
+     * Updates the current block to be the given {@link HFileBlock}. Seeks to
+     * the the first key/value pair.
+     *
+     * @param newBlock the block to make current
+     */
+    protected void updateCurrBlock(HFileBlock newBlock) {
+      block = newBlock;
+
+      // sanity check
+      if (block.getBlockType() != BlockType.DATA) {
+        throw new IllegalStateException("ScannerV2 works only on data " +
+            "blocks, got " + block.getBlockType() + "; " +
+            "fileName=" + reader.name + ", " +
+            "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
+            "isCompaction=" + isCompaction);
+      }
+
+      blockBuffer = block.getBufferWithoutHeader();
+      readKeyValueLen();
+      blockFetches++;
+
+      // Reset the next indexed key
+      this.nextIndexedKey = null;
+    }
+
+    protected void readKeyValueLen() {
+      blockBuffer.mark();
+      currKeyLen = blockBuffer.getInt();
+      currValueLen = blockBuffer.getInt();
+      ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
+      readMvccVersion();
+      if (currKeyLen < 0 || currValueLen < 0
+          || currKeyLen > blockBuffer.limit()
+          || currValueLen > blockBuffer.limit()) {
+        throw new IllegalStateException("Invalid currKeyLen " + currKeyLen
+            + " or currValueLen " + currValueLen + ". Block offset: "
+            + block.getOffset() + ", block length: " + blockBuffer.limit()
+            + ", position: " + blockBuffer.position() + " (without header).");
+      }
+      blockBuffer.reset();
+    }
+
+    protected void readMvccVersion() {
+      if (this.reader.shouldIncludeMemstoreTS()) {
+        if (this.reader.decodeMemstoreTS) {
+          try {
+            currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
+                + blockBuffer.position());
+            currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading memstore timestamp", e);
+          }
+        } else {
+          currMemstoreTS = 0;
+          currMemstoreTSLen = 1;
+        }
+      }
+    }
+
+    /**
+     * Within a loaded block, seek looking for the last key that is smaller than
+     * (or equal to?) the key we are interested in.
+     *
+     * A note on the seekBefore: if you have seekBefore = true, AND the first
+     * key in the block = key, then you'll get thrown exceptions. The caller has
+     * to check for that case and load the previous block as appropriate.
+     *
+     * @param key
+     *          the key to find
+     * @param seekBefore
+     *          find the key before the given key in case of exact match.
+     * @return 0 in case of an exact key match, 1 in case of an inexact match,
+     *         -2 in case of an inexact match and furthermore, the input key
+     *         less than the first key of current block(e.g. using a faked index
+     *         key)
+     */
+    protected int blockSeek(Cell key, boolean seekBefore) {
+      int klen, vlen;
+      long memstoreTS = 0;
+      int memstoreTSLen = 0;
+      int lastKeyValueSize = -1;
+      KeyValue.KeyOnlyKeyValue keyOnlykv = new KeyValue.KeyOnlyKeyValue();
+      do {
+        blockBuffer.mark();
+        klen = blockBuffer.getInt();
+        vlen = blockBuffer.getInt();
+        blockBuffer.reset();
+        if (this.reader.shouldIncludeMemstoreTS()) {
+          if (this.reader.decodeMemstoreTS) {
+            try {
+              int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
+                  + KEY_VALUE_LEN_SIZE + klen + vlen;
+              memstoreTS = Bytes.readVLong(blockBuffer.array(), memstoreTSOffset);
+              memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
+            } catch (Exception e) {
+              throw new RuntimeException("Error reading memstore timestamp", e);
+            }
+          } else {
+            memstoreTS = 0;
+            memstoreTSLen = 1;
+          }
+        }
+
+        int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE;
+        keyOnlykv.setKey(blockBuffer.array(), keyOffset, klen);
+        int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlykv);
+
+        if (comp == 0) {
+          if (seekBefore) {
+            if (lastKeyValueSize < 0) {
+              throw new IllegalStateException("blockSeek with seekBefore "
+                  + "at the first key of the block: key="
+                  + CellUtil.getCellKeyAsString(key)
+                  + ", blockOffset=" + block.getOffset() + ", onDiskSize="
+                  + block.getOnDiskSizeWithHeader());
+            }
+            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+            readKeyValueLen();
+            return 1; // non exact match.
+          }
+          currKeyLen = klen;
+          currValueLen = vlen;
+          if (this.reader.shouldIncludeMemstoreTS()) {
+            currMemstoreTS = memstoreTS;
+            currMemstoreTSLen = memstoreTSLen;
+          }
+          return 0; // indicate exact match
+        } else if (comp < 0) {
+          if (lastKeyValueSize > 0)
+            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+          readKeyValueLen();
+          if (lastKeyValueSize == -1 && blockBuffer.position() == 0
+              && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
+            return HConstants.INDEX_KEY_MAGIC;
+          }
+          return 1;
+        }
+
+        // The size of this key/value tuple, including key/value length fields.
+        lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
+        blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
+      } while (blockBuffer.remaining() > 0);
+
+      // Seek to the last key we successfully read. This will happen if this is
+      // the last key/value pair in the file, in which case the following call
+      // to next() has to return false.
+      blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+      readKeyValueLen();
+      return 1; // didn't exactly find it.
+    }
+
+    @Override
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+      ByteBuffer buffer = curBlock.getBufferWithoutHeader();
+      // It is safe to manipulate this buffer because we own the buffer object.
+      buffer.rewind();
+      int klen = buffer.getInt();
+      buffer.getInt();
+      ByteBuffer keyBuff = buffer.slice();
+      keyBuff.limit(klen);
+      keyBuff.rewind();
+      return keyBuff;
+    }
+
+    @Override
+    public String getKeyString() {
+      return Bytes.toStringBinary(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE, currKeyLen);
+    }
+
+    @Override
+    public String getValueString() {
+      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+          currValueLen);
+    }
+
+    @Override
+    public int compareKey(KVComparator comparator, Cell key) {
+      return comparator.compareOnlyKeyPortion(
+          key,
+          new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+              + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
+    }
+  }
+
+  /**
+   * ScannerV2 that operates on encoded data blocks.
+   */
+  protected static class EncodedScannerV2 extends AbstractScannerV2 {
+    private final HFileBlockDecodingContext decodingCtx;
+    private final DataBlockEncoder.EncodedSeeker seeker;
+    private final DataBlockEncoder dataBlockEncoder;
+    protected final HFileContext meta;
+
+    public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
+        boolean pread, boolean isCompaction, HFileContext meta) {
+      super(reader, cacheBlocks, pread, isCompaction);
+      DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
+      dataBlockEncoder = encoding.getEncoder();
+      decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
+      seeker = dataBlockEncoder.createSeeker(
+        reader.getComparator(), decodingCtx);
+      this.meta = meta;
+    }
+
+    @Override
+    public boolean isSeeked(){
+      return this.block != null;
+    }
+
+    /**
+     * Updates the current block to be the given {@link HFileBlock}. Seeks to
+     * the the first key/value pair.
+     *
+     * @param newBlock the block to make current
+     * @throws CorruptHFileException
+     */
+    private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
+      block = newBlock;
+
+      // sanity checks
+      if (block.getBlockType() != BlockType.ENCODED_DATA) {
+        throw new IllegalStateException(
+            "EncodedScanner works only on encoded data blocks");
+      }
+      short dataBlockEncoderId = block.getDataBlockEncodingId();
+      if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
+        String encoderCls = dataBlockEncoder.getClass().getName();
+        throw new CorruptHFileException("Encoder " + encoderCls
+          + " doesn't support data block encoding "
+          + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
+      }
+
+      seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
+      blockFetches++;
+
+      // Reset the next indexed key
+      this.nextIndexedKey = null;
+    }
+
+    private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
+      ByteBuffer origBlock = newBlock.getBufferReadOnly();
+      ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
+          origBlock.arrayOffset() + newBlock.headerSize() +
+          DataBlockEncoding.ID_SIZE,
+          newBlock.getUncompressedSizeWithoutHeader() -
+          DataBlockEncoding.ID_SIZE).slice();
+      return encodedBlock;
+    }
+
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader == null) {
+        return false;
+      }
+
+      if (reader.getTrailer().getEntryCount() == 0) {
+        // No data blocks.
+        return false;
+      }
+
+      long firstDataBlockOffset =
+          reader.getTrailer().getFirstDataBlockOffset();
+      if (block != null && block.getOffset() == firstDataBlockOffset) {
+        seeker.rewind();
+        return true;
+      }
+
+      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+          isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
+      if (block.getOffset() < 0) {
+        throw new IOException("Invalid block offset: " + block.getOffset());
+      }
+      updateCurrentBlock(block);
+      return true;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      boolean isValid = seeker.next();
+      if (!isValid) {
+        block = readNextDataBlock();
+        isValid = block != null;
+        if (isValid) {
+          updateCurrentBlock(block);
+        }
+      }
+      return isValid;
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      assertValidSeek();
+      return seeker.getKeyDeepCopy();
+    }
+
+    @Override
+    public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
+      return seeker.compareKey(comparator, key, offset, length);
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      assertValidSeek();
+      return seeker.getValueShallowCopy();
+    }
+
+    @Override
+    public Cell getKeyValue() {
+      if (block == null) {
+        return null;
+      }
+      return seeker.getKeyValue();
+    }
+
+    @Override
+    public String getKeyString() {
+      ByteBuffer keyBuffer = getKey();
+      return Bytes.toStringBinary(keyBuffer.array(),
+          keyBuffer.arrayOffset(), keyBuffer.limit());
+    }
+
+    @Override
+    public String getValueString() {
+      ByteBuffer valueBuffer = getValue();
+      return Bytes.toStringBinary(valueBuffer.array(),
+          valueBuffer.arrayOffset(), valueBuffer.limit());
+    }
+
+    private void assertValidSeek() {
+      if (block == null) {
+        throw new NotSeekedException();
+      }
+    }
+
+    @Override
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+      return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
+    }
+
+    @Override
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
+        boolean rewind, Cell key, boolean seekBefore) throws IOException {
+      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+        updateCurrentBlock(seekToBlock);
+      } else if (rewind) {
+        seeker.rewind();
+      }
+      this.nextIndexedKey = nextIndexedKey;
+      return seeker.seekToKeyInBlock(key, seekBefore);
+    }
+
+    @Override
+    public int compareKey(KVComparator comparator, Cell key) {
+      return seeker.compareKey(comparator, key);
+    }
+  }
+
+  /**
+   * Returns a buffer with the Bloom filter metadata. The caller takes
+   * ownership of the buffer.
+   */
+  @Override
+  public DataInput getGeneralBloomFilterMetadata() throws IOException {
+    return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
+  }
+
+  @Override
+  public DataInput getDeleteBloomFilterMetadata() throws IOException {
+    return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
+  }
+
+  private DataInput getBloomFilterMetadata(BlockType blockType)
+  throws IOException {
+    if (blockType != BlockType.GENERAL_BLOOM_META &&
+        blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
+      throw new RuntimeException("Block Type: " + blockType.toString() +
+          " is not supported") ;
+    }
+
+    for (HFileBlock b : loadOnOpenBlocks)
+      if (b.getBlockType() == blockType)
+        return b.getByteStream();
+    return null;
+  }
+
+  @Override
+  public boolean isFileInfoLoaded() {
+    return true; // We load file info in constructor in version 2.
+  }
+
+  /**
+   * Validates that the minor version is within acceptable limits.
+   * Otherwise throws an Runtime exception
+   */
+  private void validateMinorVersion(Path path, int minorVersion) {
+    if (minorVersion < MIN_MINOR_VERSION ||
+        minorVersion > MAX_MINOR_VERSION) {
+      String msg = "Minor version for path " + path +
+                   " is expected to be between " +
+                   MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
+                   " but is found to be " + minorVersion;
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+  }
+
+  @Override
+  public int getMajorVersion() {
+    return 2;
+  }
+
+  @Override
+  public HFileContext getFileContext() {
+    return hfileContext;
+  }
+
+  /**
+   * Returns false if block prefetching was requested for this file and has
+   * not completed, true otherwise
+   */
+  @VisibleForTesting
+  boolean prefetchComplete() {
+    return PrefetchExecutor.isCompleted(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
new file mode 100644
index 0000000..b28d8c1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV3.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.security.Key;
+import java.security.KeyException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * {@link HFile} reader for version 3.
+ */
+@InterfaceAudience.Private
+public class HFileReaderV3 extends HFileReaderV2 {
+
+  private static final Log LOG = LogFactory.getLog(HFileReaderV3.class);
+
+  public static final int MAX_MINOR_VERSION = 0;
+
+  /**
+   * Opens a HFile. You must load the index before you can use it by calling
+   * {@link #loadFileInfo()}.
+   * @param path
+   *          Path to HFile.
+   * @param trailer
+   *          File trailer.
+   * @param fsdis
+   *          input stream.
+   * @param size
+   *          Length of the stream.
+   * @param cacheConf
+   *          Cache configuration.
+   * @param hfs
+   *          The file system.
+   * @param conf
+   *          Configuration
+   */
+  public HFileReaderV3(final Path path, FixedFileTrailer trailer, final FSDataInputStreamWrapper fsdis,
+      final long size, final CacheConfig cacheConf, final HFileSystem hfs,
+      final Configuration conf) throws IOException {
+    super(path, trailer, fsdis, size, cacheConf, hfs, conf);
+    byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
+    // max tag length is not present in the HFile means tags were not at all written to file.
+    if (tmp != null) {
+      hfileContext.setIncludesTags(true);
+      tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
+      if (tmp != null && Bytes.toBoolean(tmp)) {
+        hfileContext.setCompressTags(true);
+      }
+    }
+  }
+
+  @Override
+  protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
+      HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
+    trailer.expectMajorVersion(3);
+    HFileContextBuilder builder = new HFileContextBuilder()
+      .withIncludesMvcc(this.includesMemstoreTS)
+      .withHBaseCheckSum(true)
+      .withCompression(this.compressAlgo);
+
+    // Check for any key material available
+    byte[] keyBytes = trailer.getEncryptionKey();
+    if (keyBytes != null) {
+      Encryption.Context cryptoContext = Encryption.newContext(conf);
+      Key key;
+      String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
+        User.getCurrent().getShortName());
+      try {
+        // First try the master key
+        key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
+      } catch (KeyException e) {
+        // If the current master key fails to unwrap, try the alternate, if
+        // one is configured
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
+        }
+        String alternateKeyName =
+          conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
+        if (alternateKeyName != null) {
+          try {
+            key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
+          } catch (KeyException ex) {
+            throw new IOException(ex);
+          }
+        } else {
+          throw new IOException(e);
+        }
+      }
+      // Use the algorithm the key wants
+      Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
+      if (cipher == null) {
+        throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
+      }
+      cryptoContext.setCipher(cipher);
+      cryptoContext.setKey(key);
+      builder.withEncryptionContext(cryptoContext);
+    }
+
+    HFileContext context = builder.build();
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reader" + (path != null ? " for " + path : "" ) +
+        " initialized with cacheConf: " + cacheConf +
+        " comparator: " + comparator.getClass().getSimpleName() +
+        " fileContext: " + context);
+    }
+
+    return context;
+  }
+
+  /**
+   * Create a Scanner on this file. No seeks or reads are done on creation. Call
+   * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+   * nothing to clean up in a Scanner. Letting go of your references to the
+   * scanner is sufficient.
+   * @param cacheBlocks
+   *          True if we should cache blocks read in by this scanner.
+   * @param pread
+   *          Use positional read rather than seek+read if true (pread is better
+   *          for random reads, seek+read is better scanning).
+   * @param isCompaction
+   *          is scanner being used for a compaction?
+   * @return Scanner on this file.
+   */
+  @Override
+  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+      final boolean isCompaction) {
+    if (dataBlockEncoder.useEncodedScanner()) {
+      return new EncodedScannerV3(this, cacheBlocks, pread, isCompaction, this.hfileContext);
+    }
+    return new ScannerV3(this, cacheBlocks, pread, isCompaction);
+  }
+
+  /**
+   * Implementation of {@link HFileScanner} interface.
+   */
+  protected static class ScannerV3 extends ScannerV2 {
+
+    private HFileReaderV3 reader;
+    private int currTagsLen;
+
+    public ScannerV3(HFileReaderV3 r, boolean cacheBlocks, final boolean pread,
+        final boolean isCompaction) {
+      super(r, cacheBlocks, pread, isCompaction);
+      this.reader = r;
+    }
+
+    @Override
+    protected int getCellBufSize() {
+      int kvBufSize = super.getCellBufSize();
+      if (reader.hfileContext.isIncludesTags()) {
+        kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen;
+      }
+      return kvBufSize;
+    }
+
+    protected void setNonSeekedState() {
+      super.setNonSeekedState();
+      currTagsLen = 0;
+    }
+
+    @Override
+    protected int getNextCellStartPosition() {
+      int nextKvPos = super.getNextCellStartPosition();
+      if (reader.hfileContext.isIncludesTags()) {
+        nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen;
+      }
+      return nextKvPos;
+    }
+
+    protected void readKeyValueLen() {
+      blockBuffer.mark();
+      currKeyLen = blockBuffer.getInt();
+      currValueLen = blockBuffer.getInt();
+      if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit()
+          || currValueLen > blockBuffer.limit()) {
+        throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen "
+            + currValueLen + ". Block offset: "
+            + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+            + blockBuffer.position() + " (without header).");
+      }
+      ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
+      if (reader.hfileContext.isIncludesTags()) {
+        // Read short as unsigned, high byte first
+        currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
+        if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) {
+          throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: "
+              + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+              + blockBuffer.position() + " (without header).");
+        }
+        ByteBufferUtils.skip(blockBuffer, currTagsLen);
+      }
+      readMvccVersion();
+      blockBuffer.reset();
+    }
+
+    /**
+     * Within a loaded block, seek looking for the last key that is smaller than
+     * (or equal to?) the key we are interested in.
+     * A note on the seekBefore: if you have seekBefore = true, AND the first
+     * key in the block = key, then you'll get thrown exceptions. The caller has
+     * to check for that case and load the previous block as appropriate.
+     * @param key
+     *          the key to find
+     * @param seekBefore
+     *          find the key before the given key in case of exact match.
+     * @return 0 in case of an exact key match, 1 in case of an inexact match,
+     *         -2 in case of an inexact match and furthermore, the input key
+     *         less than the first key of current block(e.g. using a faked index
+     *         key)
+     */
+    @Override
+    protected int blockSeek(Cell key, boolean seekBefore) {
+      int klen, vlen, tlen = 0;
+      long memstoreTS = 0;
+      int memstoreTSLen = 0;
+      int lastKeyValueSize = -1;
+      KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
+      do {
+        blockBuffer.mark();
+        klen = blockBuffer.getInt();
+        vlen = blockBuffer.getInt();
+        if (klen < 0 || vlen < 0 || klen > blockBuffer.limit()
+            || vlen > blockBuffer.limit()) {
+          throw new IllegalStateException("Invalid klen " + klen + " or vlen "
+              + vlen + ". Block offset: "
+              + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+              + blockBuffer.position() + " (without header).");
+        }
+        ByteBufferUtils.skip(blockBuffer, klen + vlen);
+        if (reader.hfileContext.isIncludesTags()) {
+          // Read short as unsigned, high byte first
+          tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
+          if (tlen < 0 || tlen > blockBuffer.limit()) {
+            throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
+                + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
+                + blockBuffer.position() + " (without header).");
+          }
+          ByteBufferUtils.skip(blockBuffer, tlen);
+        }
+        if (this.reader.shouldIncludeMemstoreTS()) {
+          if (this.reader.decodeMemstoreTS) {
+            try {
+              memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
+                  + blockBuffer.position());
+              memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
+            } catch (Exception e) {
+              throw new RuntimeException("Error reading memstore timestamp", e);
+            }
+          } else {
+            memstoreTS = 0;
+            memstoreTSLen = 1;
+          }
+        }
+        blockBuffer.reset();
+        int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2);
+        keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen);
+        int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv);
+
+        if (comp == 0) {
+          if (seekBefore) {
+            if (lastKeyValueSize < 0) {
+              throw new IllegalStateException("blockSeek with seekBefore "
+                  + "at the first key of the block: key="
+                  + CellUtil.getCellKeyAsString(key)
+                  + ", blockOffset=" + block.getOffset() + ", onDiskSize="
+                  + block.getOnDiskSizeWithHeader());
+            }
+            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+            readKeyValueLen();
+            return 1; // non exact match.
+          }
+          currKeyLen = klen;
+          currValueLen = vlen;
+          currTagsLen = tlen;
+          if (this.reader.shouldIncludeMemstoreTS()) {
+            currMemstoreTS = memstoreTS;
+            currMemstoreTSLen = memstoreTSLen;
+          }
+          return 0; // indicate exact match
+        } else if (comp < 0) {
+          if (lastKeyValueSize > 0)
+            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+          readKeyValueLen();
+          if (lastKeyValueSize == -1 && blockBuffer.position() == 0) {
+            return HConstants.INDEX_KEY_MAGIC;
+          }
+          return 1;
+        }
+
+        // The size of this key/value tuple, including key/value length fields.
+        lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
+        // include tag length also if tags included with KV
+        if (reader.hfileContext.isIncludesTags()) {
+          lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT;
+        }
+        blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
+      } while (blockBuffer.remaining() > 0);
+
+      // Seek to the last key we successfully read. This will happen if this is
+      // the last key/value pair in the file, in which case the following call
+      // to next() has to return false.
+      blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+      readKeyValueLen();
+      return 1; // didn't exactly find it.
+    }
+  }
+
+  /**
+   * ScannerV3 that operates on encoded data blocks.
+   */
+  protected static class EncodedScannerV3 extends EncodedScannerV2 {
+    public EncodedScannerV3(HFileReaderV3 reader, boolean cacheBlocks, boolean pread,
+        boolean isCompaction, HFileContext context) {
+      super(reader, cacheBlocks, pread, isCompaction, context);
+    }
+  }
+
+  @Override
+  public int getMajorVersion() {
+    return 3;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java
deleted file mode 100644
index 047022d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitationsME
- * under the License.
- */
-package org.apache.hadoop.hbase.io.hfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-
-public class HFileWriterFactory extends HFile.WriterFactory {
-  HFileWriterFactory(Configuration conf, CacheConfig cacheConf) {
-    super(conf, cacheConf);
-  }
-
-  @Override
-  public HFile.Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
-      KVComparator comparator, HFileContext context)
-  throws IOException {
-    return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, context);
-  }
-}
\ No newline at end of file


[3/4] hbase git commit: Revert "HBASE-13373 Squash HFileReaderV3 together with HFileReaderV2 and AbstractHFileReader; ditto for Scanners and BlockReader, etc.:" Revert because breaking migration tests.

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
deleted file mode 100644
index a007f37..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ /dev/null
@@ -1,1688 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io.hfile;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.Key;
-import java.security.KeyException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Cipher;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
-import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.IdLock;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Implementation that can handle all hfile versions of {@link HFile.Reader}.
- */
-@InterfaceAudience.Private
-@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
-public class HFileReaderImpl implements HFile.Reader, Configurable {
-  // This class is HFileReaderV3 + HFileReaderV2 + AbstractHFileReader all squashed together into
-  // one file.  Ditto for all the HFileReader.ScannerV? implementations. I was running up against
-  // the MaxInlineLevel limit because too many tiers involved reading from an hfile. Was also hard
-  // to navigate the source code when so many classes participating in read.
-  private static final Log LOG = LogFactory.getLog(HFileReaderImpl.class);
-
-  /** Data block index reader keeping the root data index in memory */
-  private HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
-
-  /** Meta block index reader -- always single level */
-  private HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
-
-  private final FixedFileTrailer trailer;
-
-  /** Filled when we read in the trailer. */
-  private final Compression.Algorithm compressAlgo;
-
-  /**
-   * What kind of data block encoding should be used while reading, writing,
-   * and handling cache.
-   */
-  private HFileDataBlockEncoder dataBlockEncoder = NoOpDataBlockEncoder.INSTANCE;
-
-  /** Last key in the file. Filled in when we read in the file info */
-  private byte [] lastKey = null;
-
-  /** Average key length read from file info */
-  private int avgKeyLen = -1;
-
-  /** Average value length read from file info */
-  private int avgValueLen = -1;
-
-  /** Key comparator */
-  private KVComparator comparator = new KVComparator();
-
-  /** Size of this file. */
-  private final long fileSize;
-
-  /** Block cache configuration. */
-  private final CacheConfig cacheConf;
-
-  /** Path of file */
-  private final Path path;
-
-  /** File name to be used for block names */
-  private final String name;
-
-  private FileInfo fileInfo;
-
-  private Configuration conf;
-
-  private HFileContext hfileContext;
-
-  /** Filesystem-level block reader. */
-  protected HFileBlock.FSReader fsBlockReader;
-
-  /**
-   * A "sparse lock" implementation allowing to lock on a particular block
-   * identified by offset. The purpose of this is to avoid two clients loading
-   * the same block, and have all but one client wait to get the block from the
-   * cache.
-   */
-  private IdLock offsetLock = new IdLock();
-
-  /**
-   * Blocks read from the load-on-open section, excluding data root index, meta
-   * index, and file info.
-   */
-  private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
-
-  /** Minimum minor version supported by this HFile format */
-  static final int MIN_MINOR_VERSION = 0;
-
-  /** Maximum minor version supported by this HFile format */
-  // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
-  // the file. This version can read Writables version 1.
-  static final int MAX_MINOR_VERSION = 3;
-
-  /**
-   * We can read files whose major version is v2 IFF their minor version is at least 3.
-   */
-  private static final int MIN_V2_MINOR_VERSION_WITH_PB = 3;
-
-  /** Minor versions starting with this number have faked index key */
-  static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
-
-  /**
-   * Opens a HFile. You must load the index before you can use it by calling
-   * {@link #loadFileInfo()}.
-   * @param path
-   *          Path to HFile.
-   * @param trailer
-   *          File trailer.
-   * @param fsdis
-   *          input stream.
-   * @param fileSize
-   *          Length of the stream.
-   * @param cacheConf
-   *          Cache configuration.
-   * @param hfs
-   *          The file system.
-   * @param conf
-   *          Configuration
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
-  public HFileReaderImpl(final Path path, FixedFileTrailer trailer,
-      final FSDataInputStreamWrapper fsdis,
-      final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
-      final Configuration conf)
-  throws IOException {
-    this.trailer = trailer;
-    this.compressAlgo = trailer.getCompressionCodec();
-    this.cacheConf = cacheConf;
-    this.fileSize = fileSize;
-    this.path = path;
-    this.name = path.getName();
-    this.conf = conf;
-    checkFileVersion();
-    this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
-    this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
-
-    // Comparator class name is stored in the trailer in version 2.
-    comparator = trailer.createComparator();
-    dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
-        trailer.getNumDataIndexLevels(), this);
-    metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
-        KeyValue.RAW_COMPARATOR, 1);
-
-    // Parse load-on-open data.
-
-    HFileBlock.BlockIterator blockIter = fsBlockReader.blockRange(
-        trailer.getLoadOnOpenDataOffset(),
-        fileSize - trailer.getTrailerSize());
-
-    // Data index. We also read statistics about the block index written after
-    // the root level.
-    dataBlockIndexReader.readMultiLevelIndexRoot(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
-        trailer.getDataIndexCount());
-
-    // Meta index.
-    metaBlockIndexReader.readRootIndex(
-        blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
-        trailer.getMetaIndexCount());
-
-    // File info
-    fileInfo = new FileInfo();
-    fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
-    byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
-    this.hfileContext.setFileCreateTime(creationTimeBytes == null ?
-      0 : Bytes.toLong(creationTimeBytes));
-    lastKey = fileInfo.get(FileInfo.LASTKEY);
-    avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
-    avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
-    byte [] keyValueFormatVersion = fileInfo.get(HFileWriterImpl.KEY_VALUE_VERSION);
-    includesMemstoreTS = keyValueFormatVersion != null &&
-        Bytes.toInt(keyValueFormatVersion) == HFileWriterImpl.KEY_VALUE_VER_WITH_MEMSTORE;
-    fsBlockReader.setIncludesMemstoreTS(includesMemstoreTS);
-    if (includesMemstoreTS) {
-      decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterImpl.MAX_MEMSTORE_TS_KEY)) > 0;
-    }
-
-    // Read data block encoding algorithm name from file info.
-    dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
-    fsBlockReader.setDataBlockEncoder(dataBlockEncoder);
-
-    // Store all other load-on-open blocks for further consumption.
-    HFileBlock b;
-    while ((b = blockIter.nextBlock()) != null) {
-      loadOnOpenBlocks.add(b);
-    }
-
-    // Prefetch file blocks upon open if requested
-    if (cacheConf.shouldPrefetchOnOpen()) {
-      PrefetchExecutor.request(path, new Runnable() {
-        public void run() {
-          try {
-            long offset = 0;
-            long end = fileSize - getTrailer().getTrailerSize();
-            HFileBlock prevBlock = null;
-            while (offset < end) {
-              if (Thread.interrupted()) {
-                break;
-              }
-              long onDiskSize = -1;
-              if (prevBlock != null) {
-                onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
-              }
-              HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
-                null, null);
-              prevBlock = block;
-              offset += block.getOnDiskSizeWithHeader();
-            }
-          } catch (IOException e) {
-            // IOExceptions are probably due to region closes (relocation, etc.)
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Exception encountered while prefetching " + path + ":", e);
-            }
-          } catch (Exception e) {
-            // Other exceptions are interesting
-            LOG.warn("Exception encountered while prefetching " + path + ":", e);
-          } finally {
-            PrefetchExecutor.complete(path);
-          }
-        }
-      });
-    }
-
-    byte[] tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
-    // max tag length is not present in the HFile means tags were not at all written to file.
-    if (tmp != null) {
-      hfileContext.setIncludesTags(true);
-      tmp = fileInfo.get(FileInfo.TAGS_COMPRESSED);
-      if (tmp != null && Bytes.toBoolean(tmp)) {
-        hfileContext.setCompressTags(true);
-      }
-    }
-  }
-
-  /**
-   * File version check is a little sloppy. We read v3 files but can also read v2 files if their
-   * content has been pb'd; files written with 0.98.
-   */
-  private void checkFileVersion() {
-    int majorVersion = trailer.getMajorVersion();
-    if (majorVersion == getMajorVersion()) return;
-    int minorVersion = trailer.getMinorVersion();
-    if (majorVersion == 2 && minorVersion >= MIN_V2_MINOR_VERSION_WITH_PB) return;
-    // We can read v3 or v2 versions of hfile.
-    throw new IllegalArgumentException("Invalid HFile version: major=" +
-      trailer.getMajorVersion() + ", minor=" + trailer.getMinorVersion() + ": expected at least " +
-      "major=2 and minor=" + MAX_MINOR_VERSION);
-  }
-
-  @SuppressWarnings("serial")
-  public static class BlockIndexNotLoadedException extends IllegalStateException {
-    public BlockIndexNotLoadedException() {
-      // Add a message in case anyone relies on it as opposed to class name.
-      super("Block index not loaded");
-    }
-  }
-
-  private String toStringFirstKey() {
-    return KeyValue.keyToString(getFirstKey());
-  }
-
-  private String toStringLastKey() {
-    return KeyValue.keyToString(getLastKey());
-  }
-
-  @Override
-  public String toString() {
-    return "reader=" + path.toString() +
-        (!isFileInfoLoaded()? "":
-          ", compression=" + compressAlgo.getName() +
-          ", cacheConf=" + cacheConf +
-          ", firstKey=" + toStringFirstKey() +
-          ", lastKey=" + toStringLastKey()) +
-          ", avgKeyLen=" + avgKeyLen +
-          ", avgValueLen=" + avgValueLen +
-          ", entries=" + trailer.getEntryCount() +
-          ", length=" + fileSize;
-  }
-
-  @Override
-  public long length() {
-    return fileSize;
-  }
-
-  /**
-   * @return the first key in the file. May be null if file has no entries. Note
-   *         that this is not the first row key, but rather the byte form of the
-   *         first KeyValue.
-   */
-  @Override
-  public byte [] getFirstKey() {
-    if (dataBlockIndexReader == null) {
-      throw new BlockIndexNotLoadedException();
-    }
-    return dataBlockIndexReader.isEmpty() ? null
-        : dataBlockIndexReader.getRootBlockKey(0);
-  }
-
-  /**
-   * TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's
-   * patch goes in to eliminate {@link KeyValue} here.
-   *
-   * @return the first row key, or null if the file is empty.
-   */
-  @Override
-  public byte[] getFirstRowKey() {
-    byte[] firstKey = getFirstKey();
-    return firstKey == null? null: KeyValue.createKeyValueFromKey(firstKey).getRow();
-  }
-
-  /**
-   * TODO left from {@link HFile} version 1: move this to StoreFile after
-   * Ryan's patch goes in to eliminate {@link KeyValue} here.
-   *
-   * @return the last row key, or null if the file is empty.
-   */
-  @Override
-  public byte[] getLastRowKey() {
-    byte[] lastKey = getLastKey();
-    return lastKey == null? null: KeyValue.createKeyValueFromKey(lastKey).getRow();
-  }
-
-  /** @return number of KV entries in this HFile */
-  @Override
-  public long getEntries() {
-    return trailer.getEntryCount();
-  }
-
-  /** @return comparator */
-  @Override
-  public KVComparator getComparator() {
-    return comparator;
-  }
-
-  /** @return compression algorithm */
-  @Override
-  public Compression.Algorithm getCompressionAlgorithm() {
-    return compressAlgo;
-  }
-
-  /**
-   * @return the total heap size of data and meta block indexes in bytes. Does
-   *         not take into account non-root blocks of a multilevel data index.
-   */
-  public long indexSize() {
-    return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
-        + ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
-            : 0);
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
-    return dataBlockIndexReader;
-  }
-
-  @Override
-  public FixedFileTrailer getTrailer() {
-    return trailer;
-  }
-
-  @Override
-  public FileInfo loadFileInfo() throws IOException {
-    return fileInfo;
-  }
-
-  /**
-   * An exception thrown when an operation requiring a scanner to be seeked
-   * is invoked on a scanner that is not seeked.
-   */
-  @SuppressWarnings("serial")
-  public static class NotSeekedException extends IllegalStateException {
-    public NotSeekedException() {
-      super("Not seeked to a key/value");
-    }
-  }
-
-  protected static class HFileScannerImpl implements HFileScanner {
-    private ByteBuffer blockBuffer;
-    protected final boolean cacheBlocks;
-    protected final boolean pread;
-    protected final boolean isCompaction;
-    private int currKeyLen;
-    private int currValueLen;
-    private int currMemstoreTSLen;
-    private long currMemstoreTS;
-    // Updated but never read?
-    protected volatile int blockFetches;
-    protected final HFile.Reader reader;
-    private int currTagsLen;
-
-    protected HFileBlock block;
-
-    /**
-     * The next indexed key is to keep track of the indexed key of the next data block.
-     * If the nextIndexedKey is HConstants.NO_NEXT_INDEXED_KEY, it means that the
-     * current data block is the last data block.
-     *
-     * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet.
-     */
-    protected Cell nextIndexedKey;
-
-    public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
-        final boolean pread, final boolean isCompaction) {
-      this.reader = reader;
-      this.cacheBlocks = cacheBlocks;
-      this.pread = pread;
-      this.isCompaction = isCompaction;
-    }
-
-    @Override
-    public boolean isSeeked(){
-      return blockBuffer != null;
-    }
-
-    @Override
-    public String toString() {
-      return "HFileScanner for reader " + String.valueOf(getReader());
-    }
-
-    protected void assertSeeked() {
-      if (!isSeeked())
-        throw new NotSeekedException();
-    }
-
-    @Override
-    public int seekTo(byte[] key) throws IOException {
-      return seekTo(key, 0, key.length);
-    }
-
-    @Override
-    public boolean seekBefore(byte[] key) throws IOException {
-      return seekBefore(key, 0, key.length);
-    }
-
-    @Override
-    public int reseekTo(byte[] key) throws IOException {
-      return reseekTo(key, 0, key.length);
-    }
-
-    @Override
-    public HFile.Reader getReader() {
-      return reader;
-    }
-
-    protected int getCellBufSize() {
-      int kvBufSize = KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
-      if (this.reader.getFileContext().isIncludesTags()) {
-        kvBufSize += Bytes.SIZEOF_SHORT + currTagsLen;
-      }
-      return kvBufSize;
-    }
-
-    protected int getNextCellStartPosition() {
-      int nextKvPos =  blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
-          + currMemstoreTSLen;
-      if (this.reader.getFileContext().isIncludesTags()) {
-        nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen;
-      }
-      return nextKvPos;
-    }
-
-    protected void readKeyValueLen() {
-      blockBuffer.mark();
-      currKeyLen = blockBuffer.getInt();
-      currValueLen = blockBuffer.getInt();
-      if (currKeyLen < 0 || currValueLen < 0 || currKeyLen > blockBuffer.limit()
-          || currValueLen > blockBuffer.limit()) {
-        throw new IllegalStateException("Invalid currKeyLen " + currKeyLen + " or currValueLen "
-            + currValueLen + ". Block offset: "
-            + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
-            + blockBuffer.position() + " (without header).");
-      }
-      ByteBufferUtils.skip(blockBuffer, currKeyLen + currValueLen);
-      if (this.reader.getFileContext().isIncludesTags()) {
-        // Read short as unsigned, high byte first
-        currTagsLen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
-        if (currTagsLen < 0 || currTagsLen > blockBuffer.limit()) {
-          throw new IllegalStateException("Invalid currTagsLen " + currTagsLen + ". Block offset: "
-              + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
-              + blockBuffer.position() + " (without header).");
-        }
-        ByteBufferUtils.skip(blockBuffer, currTagsLen);
-      }
-      readMvccVersion();
-      blockBuffer.reset();
-    }
-
-    /**
-     * Within a loaded block, seek looking for the last key that is smaller than
-     * (or equal to?) the key we are interested in.
-     * A note on the seekBefore: if you have seekBefore = true, AND the first
-     * key in the block = key, then you'll get thrown exceptions. The caller has
-     * to check for that case and load the previous block as appropriate.
-     * @param key
-     *          the key to find
-     * @param seekBefore
-     *          find the key before the given key in case of exact match.
-     * @return 0 in case of an exact key match, 1 in case of an inexact match,
-     *         -2 in case of an inexact match and furthermore, the input key
-     *         less than the first key of current block(e.g. using a faked index
-     *         key)
-     */
-    protected int blockSeek(Cell key, boolean seekBefore) {
-      int klen, vlen, tlen = 0;
-      long memstoreTS = 0;
-      int memstoreTSLen = 0;
-      int lastKeyValueSize = -1;
-      KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
-      do {
-        blockBuffer.mark();
-        klen = blockBuffer.getInt();
-        vlen = blockBuffer.getInt();
-        if (klen < 0 || vlen < 0 || klen > blockBuffer.limit()
-            || vlen > blockBuffer.limit()) {
-          throw new IllegalStateException("Invalid klen " + klen + " or vlen "
-              + vlen + ". Block offset: "
-              + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
-              + blockBuffer.position() + " (without header).");
-        }
-        ByteBufferUtils.skip(blockBuffer, klen + vlen);
-        if (this.reader.getFileContext().isIncludesTags()) {
-          // Read short as unsigned, high byte first
-          tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
-          if (tlen < 0 || tlen > blockBuffer.limit()) {
-            throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
-                + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
-                + blockBuffer.position() + " (without header).");
-          }
-          ByteBufferUtils.skip(blockBuffer, tlen);
-        }
-        if (this.reader.shouldIncludeMemstoreTS()) {
-          if (this.reader.isDecodeMemstoreTS()) {
-            try {
-              memstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
-                  + blockBuffer.position());
-              memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
-            } catch (Exception e) {
-              throw new RuntimeException("Error reading memstore timestamp", e);
-            }
-          } else {
-            memstoreTS = 0;
-            memstoreTSLen = 1;
-          }
-        }
-        blockBuffer.reset();
-        int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2);
-        keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen);
-        int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlyKv);
-
-        if (comp == 0) {
-          if (seekBefore) {
-            if (lastKeyValueSize < 0) {
-              throw new IllegalStateException("blockSeek with seekBefore "
-                  + "at the first key of the block: key="
-                  + CellUtil.getCellKeyAsString(key)
-                  + ", blockOffset=" + block.getOffset() + ", onDiskSize="
-                  + block.getOnDiskSizeWithHeader());
-            }
-            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
-            readKeyValueLen();
-            return 1; // non exact match.
-          }
-          currKeyLen = klen;
-          currValueLen = vlen;
-          currTagsLen = tlen;
-          if (this.reader.shouldIncludeMemstoreTS()) {
-            currMemstoreTS = memstoreTS;
-            currMemstoreTSLen = memstoreTSLen;
-          }
-          return 0; // indicate exact match
-        } else if (comp < 0) {
-          if (lastKeyValueSize > 0)
-            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
-          readKeyValueLen();
-          if (lastKeyValueSize == -1 && blockBuffer.position() == 0) {
-            return HConstants.INDEX_KEY_MAGIC;
-          }
-          return 1;
-        }
-
-        // The size of this key/value tuple, including key/value length fields.
-        lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
-        // include tag length also if tags included with KV
-        if (this.reader.getFileContext().isIncludesTags()) {
-          lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT;
-        }
-        blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
-      } while (blockBuffer.remaining() > 0);
-
-      // Seek to the last key we successfully read. This will happen if this is
-      // the last key/value pair in the file, in which case the following call
-      // to next() has to return false.
-      blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
-      readKeyValueLen();
-      return 1; // didn't exactly find it.
-    }
-
-    @Override
-    public Cell getNextIndexedKey() {
-      return nextIndexedKey;
-    }
-
-    @Override
-    public int seekTo(byte[] key, int offset, int length) throws IOException {
-      // Always rewind to the first key of the block, because the given key
-      // might be before or after the current key.
-      return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
-    }
-
-    @Override
-    public int reseekTo(byte[] key, int offset, int length) throws IOException {
-      return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
-    }
-
-    @Override
-    public int seekTo(Cell key) throws IOException {
-      return seekTo(key, true);
-    }
-
-    @Override
-    public int reseekTo(Cell key) throws IOException {
-      int compared;
-      if (isSeeked()) {
-        compared = compareKey(reader.getComparator(), key);
-        if (compared < 1) {
-          // If the required key is less than or equal to current key, then
-          // don't do anything.
-          return compared;
-        } else {
-          // The comparison with no_next_index_key has to be checked
-          if (this.nextIndexedKey != null &&
-              (this.nextIndexedKey == HConstants.NO_NEXT_INDEXED_KEY || reader
-              .getComparator().compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
-            // The reader shall continue to scan the current data block instead
-            // of querying the
-            // block index as long as it knows the target key is strictly
-            // smaller than
-            // the next indexed key or the current data block is the last data
-            // block.
-            return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
-          }
-        }
-      }
-      // Don't rewind on a reseek operation, because reseek implies that we are
-      // always going forward in the file.
-      return seekTo(key, false);
-    }
-
-    /**
-     * An internal API function. Seek to the given key, optionally rewinding to
-     * the first key of the block before doing the seek.
-     *
-     * @param key - a cell representing the key that we need to fetch
-     * @param rewind whether to rewind to the first key of the block before
-     *        doing the seek. If this is false, we are assuming we never go
-     *        back, otherwise the result is undefined.
-     * @return -1 if the key is earlier than the first key of the file,
-     *         0 if we are at the given key, 1 if we are past the given key
-     *         -2 if the key is earlier than the first key of the file while
-     *         using a faked index key
-     * @throws IOException
-     */
-    public int seekTo(Cell key, boolean rewind) throws IOException {
-      HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
-      BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
-          cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
-      if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
-        // This happens if the key e.g. falls before the beginning of the file.
-        return -1;
-      }
-      return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
-          blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
-    }
-
-    @Override
-    public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
-      return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
-    }
-
-    @Override
-    public boolean seekBefore(Cell key) throws IOException {
-      HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
-          cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction));
-      if (seekToBlock == null) {
-        return false;
-      }
-      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
-
-      if (reader.getComparator()
-          .compareOnlyKeyPortion(
-              new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
-                  firstKey.limit()), key) >= 0) {
-        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
-        // The key we are interested in
-        if (previousBlockOffset == -1) {
-          // we have a 'problem', the key we want is the first of the file.
-          return false;
-        }
-
-        // It is important that we compute and pass onDiskSize to the block
-        // reader so that it does not have to read the header separately to
-        // figure out the size.
-        seekToBlock = reader.readBlock(previousBlockOffset,
-            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
-            pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
-        // TODO shortcut: seek forward in this block to the last key of the
-        // block.
-      }
-      Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
-      loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
-      return true;
-    }
-
-    /**
-     * Scans blocks in the "scanned" section of the {@link HFile} until the next
-     * data block is found.
-     *
-     * @return the next block, or null if there are no more data blocks
-     * @throws IOException
-     */
-    protected HFileBlock readNextDataBlock() throws IOException {
-      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
-      if (block == null)
-        return null;
-
-      HFileBlock curBlock = block;
-
-      do {
-        if (curBlock.getOffset() >= lastDataBlockOffset)
-          return null;
-
-        if (curBlock.getOffset() < 0) {
-          throw new IOException("Invalid block file offset: " + block);
-        }
-
-        // We are reading the next block without block type validation, because
-        // it might turn out to be a non-data block.
-        curBlock = reader.readBlock(curBlock.getOffset()
-            + curBlock.getOnDiskSizeWithHeader(),
-            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
-            isCompaction, true, null, getEffectiveDataBlockEncoding());
-      } while (!curBlock.getBlockType().isData());
-
-      return curBlock;
-    }
-
-    public DataBlockEncoding getEffectiveDataBlockEncoding() {
-      return this.reader.getEffectiveEncodingInCache(isCompaction);
-    }
-
-    @Override
-    public Cell getKeyValue() {
-      if (!isSeeked())
-        return null;
-
-      KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
-          + blockBuffer.position(), getCellBufSize());
-      if (this.reader.shouldIncludeMemstoreTS()) {
-        ret.setSequenceId(currMemstoreTS);
-      }
-      return ret;
-    }
-
-    @Override
-    public ByteBuffer getKey() {
-      assertSeeked();
-      return ByteBuffer.wrap(
-          blockBuffer.array(),
-          blockBuffer.arrayOffset() + blockBuffer.position()
-              + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
-    }
-
-    public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
-      return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
-          blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
-    }
-
-    @Override
-    public ByteBuffer getValue() {
-      assertSeeked();
-      return ByteBuffer.wrap(
-          blockBuffer.array(),
-          blockBuffer.arrayOffset() + blockBuffer.position()
-              + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
-    }
-
-    protected void setNonSeekedState() {
-      block = null;
-      blockBuffer = null;
-      currKeyLen = 0;
-      currValueLen = 0;
-      currMemstoreTS = 0;
-      currMemstoreTSLen = 0;
-      currTagsLen = 0;
-    }
-
-    /**
-     * Go to the next key/value in the block section. Loads the next block if
-     * necessary. If successful, {@link #getKey()} and {@link #getValue()} can
-     * be called.
-     *
-     * @return true if successfully navigated to the next key/value
-     */
-    @Override
-    public boolean next() throws IOException {
-      assertSeeked();
-
-      try {
-        blockBuffer.position(getNextCellStartPosition());
-      } catch (IllegalArgumentException e) {
-        LOG.error("Current pos = " + blockBuffer.position()
-            + "; currKeyLen = " + currKeyLen + "; currValLen = "
-            + currValueLen + "; block limit = " + blockBuffer.limit()
-            + "; HFile name = " + reader.getName()
-            + "; currBlock currBlockOffset = " + block.getOffset());
-        throw e;
-      }
-
-      if (blockBuffer.remaining() <= 0) {
-        long lastDataBlockOffset =
-            reader.getTrailer().getLastDataBlockOffset();
-
-        if (block.getOffset() >= lastDataBlockOffset) {
-          setNonSeekedState();
-          return false;
-        }
-
-        // read the next block
-        HFileBlock nextBlock = readNextDataBlock();
-        if (nextBlock == null) {
-          setNonSeekedState();
-          return false;
-        }
-
-        updateCurrBlock(nextBlock);
-        return true;
-      }
-
-      // We are still in the same block.
-      readKeyValueLen();
-      return true;
-    }
-
-    /**
-     * Positions this scanner at the start of the file.
-     *
-     * @return false if empty file; i.e. a call to next would return false and
-     *         the current key and value are undefined.
-     * @throws IOException
-     */
-    @Override
-    public boolean seekTo() throws IOException {
-      if (reader == null) {
-        return false;
-      }
-
-      if (reader.getTrailer().getEntryCount() == 0) {
-        // No data blocks.
-        return false;
-      }
-
-      long firstDataBlockOffset =
-          reader.getTrailer().getFirstDataBlockOffset();
-      if (block != null && block.getOffset() == firstDataBlockOffset) {
-        blockBuffer.rewind();
-        readKeyValueLen();
-        return true;
-      }
-
-      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
-          isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
-      if (block.getOffset() < 0) {
-        throw new IOException("Invalid block offset: " + block.getOffset());
-      }
-      updateCurrBlock(block);
-      return true;
-    }
-
-    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
-        boolean rewind, Cell key, boolean seekBefore) throws IOException {
-      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
-        updateCurrBlock(seekToBlock);
-      } else if (rewind) {
-        blockBuffer.rewind();
-      }
-
-      // Update the nextIndexedKey
-      this.nextIndexedKey = nextIndexedKey;
-      return blockSeek(key, seekBefore);
-    }
-
-    /**
-     * Updates the current block to be the given {@link HFileBlock}. Seeks to
-     * the the first key/value pair.
-     *
-     * @param newBlock the block to make current
-     */
-    protected void updateCurrBlock(HFileBlock newBlock) {
-      block = newBlock;
-
-      // sanity check
-      if (block.getBlockType() != BlockType.DATA) {
-        throw new IllegalStateException("Scanner works only on data " +
-            "blocks, got " + block.getBlockType() + "; " +
-            "fileName=" + reader.getName() + ", " +
-            "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " +
-            "isCompaction=" + isCompaction);
-      }
-
-      blockBuffer = block.getBufferWithoutHeader();
-      readKeyValueLen();
-      blockFetches++;
-
-      // Reset the next indexed key
-      this.nextIndexedKey = null;
-    }
-
-    protected void readMvccVersion() {
-      if (this.reader.shouldIncludeMemstoreTS()) {
-        if (this.reader.isDecodeMemstoreTS()) {
-          try {
-            currMemstoreTS = Bytes.readVLong(blockBuffer.array(), blockBuffer.arrayOffset()
-                + blockBuffer.position());
-            currMemstoreTSLen = WritableUtils.getVIntSize(currMemstoreTS);
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading memstore timestamp", e);
-          }
-        } else {
-          currMemstoreTS = 0;
-          currMemstoreTSLen = 1;
-        }
-      }
-    }
-
-    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
-      ByteBuffer buffer = curBlock.getBufferWithoutHeader();
-      // It is safe to manipulate this buffer because we own the buffer object.
-      buffer.rewind();
-      int klen = buffer.getInt();
-      buffer.getInt();
-      ByteBuffer keyBuff = buffer.slice();
-      keyBuff.limit(klen);
-      keyBuff.rewind();
-      return keyBuff;
-    }
-
-    @Override
-    public String getKeyString() {
-      return Bytes.toStringBinary(blockBuffer.array(),
-          blockBuffer.arrayOffset() + blockBuffer.position()
-              + KEY_VALUE_LEN_SIZE, currKeyLen);
-    }
-
-    @Override
-    public String getValueString() {
-      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
-          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
-          currValueLen);
-    }
-
-    public int compareKey(KVComparator comparator, Cell key) {
-      return comparator.compareOnlyKeyPortion(
-          key,
-          new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
-              + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
-    }
-  }
-
-  public Path getPath() {
-    return path;
-  }
-
-  @Override
-  public DataBlockEncoding getDataBlockEncoding() {
-    return dataBlockEncoder.getDataBlockEncoding();
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /** Minor versions in HFile starting with this number have hbase checksums */
-  public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
-  /** In HFile minor version that does not support checksums */
-  public static final int MINOR_VERSION_NO_CHECKSUM = 0;
-
-  /** HFile minor version that introduced pbuf filetrailer */
-  public static final int PBUF_TRAILER_MINOR_VERSION = 2;
-
-  /**
-   * The size of a (key length, value length) tuple that prefixes each entry in
-   * a data block.
-   */
-  public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
-
-  protected boolean includesMemstoreTS = false;
-  protected boolean decodeMemstoreTS = false;
-
-
-  public boolean isDecodeMemstoreTS() {
-    return this.decodeMemstoreTS;
-  }
-
-  public boolean shouldIncludeMemstoreTS() {
-    return includesMemstoreTS;
-  }
-
-  /**
-   * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType}
-   * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
-   */
-   private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
-       boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
-       DataBlockEncoding expectedDataBlockEncoding) throws IOException {
-     // Check cache for block. If found return.
-     if (cacheConf.isBlockCacheEnabled()) {
-       BlockCache cache = cacheConf.getBlockCache();
-       HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
-         updateCacheMetrics);
-       if (cachedBlock != null) {
-         if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
-           cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
-         }
-         validateBlockType(cachedBlock, expectedBlockType);
-
-         if (expectedDataBlockEncoding == null) {
-           return cachedBlock;
-         }
-         DataBlockEncoding actualDataBlockEncoding =
-                 cachedBlock.getDataBlockEncoding();
-         // Block types other than data blocks always have
-         // DataBlockEncoding.NONE. To avoid false negative cache misses, only
-         // perform this check if cached block is a data block.
-         if (cachedBlock.getBlockType().isData() &&
-                 !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) {
-           // This mismatch may happen if a Scanner, which is used for say a
-           // compaction, tries to read an encoded block from the block cache.
-           // The reverse might happen when an EncodedScanner tries to read
-           // un-encoded blocks which were cached earlier.
-           //
-           // Because returning a data block with an implicit BlockType mismatch
-           // will cause the requesting scanner to throw a disk read should be
-           // forced here. This will potentially cause a significant number of
-           // cache misses, so update so we should keep track of this as it might
-           // justify the work on a CompoundScanner.
-           if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) &&
-                   !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) {
-             // If the block is encoded but the encoding does not match the
-             // expected encoding it is likely the encoding was changed but the
-             // block was not yet evicted. Evictions on file close happen async
-             // so blocks with the old encoding still linger in cache for some
-             // period of time. This event should be rare as it only happens on
-             // schema definition change.
-             LOG.info("Evicting cached block with key " + cacheKey +
-                     " because of a data block encoding mismatch" +
-                     "; expected: " + expectedDataBlockEncoding +
-                     ", actual: " + actualDataBlockEncoding);
-             cache.evictBlock(cacheKey);
-           }
-           return null;
-         }
-         return cachedBlock;
-       }
-     }
-     return null;
-   }
-
-  /**
-   * @param metaBlockName
-   * @param cacheBlock Add block to cache, if found
-   * @return block wrapped in a ByteBuffer, with header skipped
-   * @throws IOException
-   */
-  @Override
-  public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
-      throws IOException {
-    if (trailer.getMetaIndexCount() == 0) {
-      return null; // there are no meta blocks
-    }
-    if (metaBlockIndexReader == null) {
-      throw new IOException("Meta index not loaded");
-    }
-
-    byte[] mbname = Bytes.toBytes(metaBlockName);
-    int block = metaBlockIndexReader.rootBlockContainingKey(mbname,
-        0, mbname.length);
-    if (block == -1)
-      return null;
-    long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
-
-    // Per meta key from any given file, synchronize reads for said block. This
-    // is OK to do for meta blocks because the meta block index is always
-    // single-level.
-    synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
-      // Check cache for block. If found return.
-      long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
-      BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset);
-
-      cacheBlock &= cacheConf.shouldCacheDataOnRead();
-      if (cacheConf.isBlockCacheEnabled()) {
-        HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
-          BlockType.META, null);
-        if (cachedBlock != null) {
-          assert cachedBlock.isUnpacked() : "Packed block leak.";
-          // Return a distinct 'shallow copy' of the block,
-          // so pos does not get messed by the scanner
-          return cachedBlock.getBufferWithoutHeader();
-        }
-        // Cache Miss, please load.
-      }
-
-      HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
-          blockSize, -1, true).unpack(hfileContext, fsBlockReader);
-
-      // Cache the block
-      if (cacheBlock) {
-        cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
-            cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
-      }
-
-      return metaBlock.getBufferWithoutHeader();
-    }
-  }
-
-  @Override
-  public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
-      final boolean cacheBlock, boolean pread, final boolean isCompaction,
-      boolean updateCacheMetrics, BlockType expectedBlockType,
-      DataBlockEncoding expectedDataBlockEncoding)
-      throws IOException {
-    if (dataBlockIndexReader == null) {
-      throw new IOException("Block index not loaded");
-    }
-    if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
-      throw new IOException("Requested block is out of range: " + dataBlockOffset +
-        ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset());
-    }
-    // For any given block from any given file, synchronize reads for said
-    // block.
-    // Without a cache, this synchronizing is needless overhead, but really
-    // the other choice is to duplicate work (which the cache would prevent you
-    // from doing).
-
-    BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset);
-
-    boolean useLock = false;
-    IdLock.Entry lockEntry = null;
-    TraceScope traceScope = Trace.startSpan("HFileReaderImpl.readBlock");
-    try {
-      while (true) {
-        if (useLock) {
-          lockEntry = offsetLock.getLockEntry(dataBlockOffset);
-        }
-
-        // Check cache for block. If found return.
-        if (cacheConf.isBlockCacheEnabled()) {
-          // Try and get the block from the block cache. If the useLock variable is true then this
-          // is the second time through the loop and it should not be counted as a block cache miss.
-          HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
-            updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
-          if (cachedBlock != null) {
-            if (Trace.isTracing()) {
-              traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
-            }
-            assert cachedBlock.isUnpacked() : "Packed block leak.";
-            if (cachedBlock.getBlockType().isData()) {
-              if (updateCacheMetrics) {
-                HFile.dataBlockReadCnt.incrementAndGet();
-              }
-              // Validate encoding type for data blocks. We include encoding
-              // type in the cache key, and we expect it to match on a cache hit.
-              if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
-                throw new IOException("Cached block under key " + cacheKey + " "
-                  + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
-                  + dataBlockEncoder.getDataBlockEncoding() + ")");
-              }
-            }
-            // Cache-hit. Return!
-            return cachedBlock;
-          }
-          // Carry on, please load.
-        }
-        if (!useLock) {
-          // check cache again with lock
-          useLock = true;
-          continue;
-        }
-        if (Trace.isTracing()) {
-          traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
-        }
-        // Load block from filesystem.
-        HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
-            pread);
-        validateBlockType(hfileBlock, expectedBlockType);
-        HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
-        BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
-
-        // Cache the block if necessary
-        if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
-          cacheConf.getBlockCache().cacheBlock(cacheKey,
-            cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
-            cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
-        }
-
-        if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
-          HFile.dataBlockReadCnt.incrementAndGet();
-        }
-
-        return unpacked;
-      }
-    } finally {
-      traceScope.close();
-      if (lockEntry != null) {
-        offsetLock.releaseLockEntry(lockEntry);
-      }
-    }
-  }
-
-  @Override
-  public boolean hasMVCCInfo() {
-    return includesMemstoreTS && decodeMemstoreTS;
-  }
-
-  /**
-   * Compares the actual type of a block retrieved from cache or disk with its
-   * expected type and throws an exception in case of a mismatch. Expected
-   * block type of {@link BlockType#DATA} is considered to match the actual
-   * block type [@link {@link BlockType#ENCODED_DATA} as well.
-   * @param block a block retrieved from cache or disk
-   * @param expectedBlockType the expected block type, or null to skip the
-   *          check
-   */
-  private void validateBlockType(HFileBlock block,
-      BlockType expectedBlockType) throws IOException {
-    if (expectedBlockType == null) {
-      return;
-    }
-    BlockType actualBlockType = block.getBlockType();
-    if (expectedBlockType.isData() && actualBlockType.isData()) {
-      // We consider DATA to match ENCODED_DATA for the purpose of this
-      // verification.
-      return;
-    }
-    if (actualBlockType != expectedBlockType) {
-      throw new IOException("Expected block type " + expectedBlockType + ", " +
-          "but got " + actualBlockType + ": " + block);
-    }
-  }
-
-  /**
-   * @return Last key in the file. May be null if file has no entries. Note that
-   *         this is not the last row key, but rather the byte form of the last
-   *         KeyValue.
-   */
-  @Override
-  public byte[] getLastKey() {
-    return dataBlockIndexReader.isEmpty() ? null : lastKey;
-  }
-
-  /**
-   * @return Midkey for this file. We work with block boundaries only so
-   *         returned midkey is an approximation only.
-   * @throws IOException
-   */
-  @Override
-  public byte[] midkey() throws IOException {
-    return dataBlockIndexReader.midkey();
-  }
-
-  @Override
-  public void close() throws IOException {
-    close(cacheConf.shouldEvictOnClose());
-  }
-
-  public void close(boolean evictOnClose) throws IOException {
-    PrefetchExecutor.cancel(path);
-    if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
-      int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("On close, file=" + name + " evicted=" + numEvicted
-          + " block(s)");
-      }
-    }
-    fsBlockReader.closeStreams();
-  }
-
-  public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
-    return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction);
-  }
-
-  /** For testing */
-  public HFileBlock.FSReader getUncachedBlockReader() {
-    return fsBlockReader;
-  }
-
-  /**
-   * Scanner that operates on encoded data blocks.
-   */
-  protected static class EncodedScanner extends HFileScannerImpl {
-    private final HFileBlockDecodingContext decodingCtx;
-    private final DataBlockEncoder.EncodedSeeker seeker;
-    private final DataBlockEncoder dataBlockEncoder;
-    protected final HFileContext meta;
-
-    public EncodedScanner(HFile.Reader reader, boolean cacheBlocks,
-        boolean pread, boolean isCompaction, HFileContext meta) {
-      super(reader, cacheBlocks, pread, isCompaction);
-      DataBlockEncoding encoding = reader.getDataBlockEncoding();
-      dataBlockEncoder = encoding.getEncoder();
-      decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
-      seeker = dataBlockEncoder.createSeeker(
-        reader.getComparator(), decodingCtx);
-      this.meta = meta;
-    }
-
-    @Override
-    public boolean isSeeked(){
-      return this.block != null;
-    }
-
-    /**
-     * Updates the current block to be the given {@link HFileBlock}. Seeks to
-     * the the first key/value pair.
-     *
-     * @param newBlock the block to make current
-     * @throws CorruptHFileException
-     */
-    private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
-      block = newBlock;
-
-      // sanity checks
-      if (block.getBlockType() != BlockType.ENCODED_DATA) {
-        throw new IllegalStateException(
-            "EncodedScanner works only on encoded data blocks");
-      }
-      short dataBlockEncoderId = block.getDataBlockEncodingId();
-      if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
-        String encoderCls = dataBlockEncoder.getClass().getName();
-        throw new CorruptHFileException("Encoder " + encoderCls
-          + " doesn't support data block encoding "
-          + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
-      }
-
-      seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
-      blockFetches++;
-
-      // Reset the next indexed key
-      this.nextIndexedKey = null;
-    }
-
-    private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
-      ByteBuffer origBlock = newBlock.getBufferReadOnly();
-      ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
-          origBlock.arrayOffset() + newBlock.headerSize() +
-          DataBlockEncoding.ID_SIZE,
-          newBlock.getUncompressedSizeWithoutHeader() -
-          DataBlockEncoding.ID_SIZE).slice();
-      return encodedBlock;
-    }
-
-    @Override
-    public boolean seekTo() throws IOException {
-      if (reader == null) {
-        return false;
-      }
-
-      if (reader.getTrailer().getEntryCount() == 0) {
-        // No data blocks.
-        return false;
-      }
-
-      long firstDataBlockOffset =
-          reader.getTrailer().getFirstDataBlockOffset();
-      if (block != null && block.getOffset() == firstDataBlockOffset) {
-        seeker.rewind();
-        return true;
-      }
-
-      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
-          isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
-      if (block.getOffset() < 0) {
-        throw new IOException("Invalid block offset: " + block.getOffset());
-      }
-      updateCurrentBlock(block);
-      return true;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      boolean isValid = seeker.next();
-      if (!isValid) {
-        block = readNextDataBlock();
-        isValid = block != null;
-        if (isValid) {
-          updateCurrentBlock(block);
-        }
-      }
-      return isValid;
-    }
-
-    @Override
-    public ByteBuffer getKey() {
-      assertValidSeek();
-      return seeker.getKeyDeepCopy();
-    }
-
-    public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
-      return seeker.compareKey(comparator, key, offset, length);
-    }
-
-    @Override
-    public ByteBuffer getValue() {
-      assertValidSeek();
-      return seeker.getValueShallowCopy();
-    }
-
-    @Override
-    public Cell getKeyValue() {
-      if (block == null) {
-        return null;
-      }
-      return seeker.getKeyValue();
-    }
-
-    @Override
-    public String getKeyString() {
-      ByteBuffer keyBuffer = getKey();
-      return Bytes.toStringBinary(keyBuffer.array(),
-          keyBuffer.arrayOffset(), keyBuffer.limit());
-    }
-
-    @Override
-    public String getValueString() {
-      ByteBuffer valueBuffer = getValue();
-      return Bytes.toStringBinary(valueBuffer.array(),
-          valueBuffer.arrayOffset(), valueBuffer.limit());
-    }
-
-    private void assertValidSeek() {
-      if (block == null) {
-        throw new NotSeekedException();
-      }
-    }
-
-    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
-      return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
-    }
-
-    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
-        boolean rewind, Cell key, boolean seekBefore) throws IOException {
-      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
-        updateCurrentBlock(seekToBlock);
-      } else if (rewind) {
-        seeker.rewind();
-      }
-      this.nextIndexedKey = nextIndexedKey;
-      return seeker.seekToKeyInBlock(key, seekBefore);
-    }
-
-    public int compareKey(KVComparator comparator, Cell key) {
-      return seeker.compareKey(comparator, key);
-    }
-  }
-
-  /**
-   * Returns a buffer with the Bloom filter metadata. The caller takes
-   * ownership of the buffer.
-   */
-  @Override
-  public DataInput getGeneralBloomFilterMetadata() throws IOException {
-    return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
-  }
-
-  @Override
-  public DataInput getDeleteBloomFilterMetadata() throws IOException {
-    return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
-  }
-
-  private DataInput getBloomFilterMetadata(BlockType blockType)
-  throws IOException {
-    if (blockType != BlockType.GENERAL_BLOOM_META &&
-        blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
-      throw new RuntimeException("Block Type: " + blockType.toString() +
-          " is not supported") ;
-    }
-
-    for (HFileBlock b : loadOnOpenBlocks)
-      if (b.getBlockType() == blockType)
-        return b.getByteStream();
-    return null;
-  }
-
-  public boolean isFileInfoLoaded() {
-    return true; // We load file info in constructor in version 2.
-  }
-
-  /**
-   * Validates that the minor version is within acceptable limits.
-   * Otherwise throws an Runtime exception
-   */
-  private void validateMinorVersion(Path path, int minorVersion) {
-    if (minorVersion < MIN_MINOR_VERSION ||
-        minorVersion > MAX_MINOR_VERSION) {
-      String msg = "Minor version for path " + path +
-                   " is expected to be between " +
-                   MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
-                   " but is found to be " + minorVersion;
-      LOG.error(msg);
-      throw new RuntimeException(msg);
-    }
-  }
-
-  @Override
-  public HFileContext getFileContext() {
-    return hfileContext;
-  }
-
-  /**
-   * Returns false if block prefetching was requested for this file and has
-   * not completed, true otherwise
-   */
-  @VisibleForTesting
-  public boolean prefetchComplete() {
-    return PrefetchExecutor.isCompleted(path);
-  }
-
-  protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
-      HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
-    HFileContextBuilder builder = new HFileContextBuilder()
-      .withIncludesMvcc(this.includesMemstoreTS)
-      .withHBaseCheckSum(true)
-      .withCompression(this.compressAlgo);
-
-    // Check for any key material available
-    byte[] keyBytes = trailer.getEncryptionKey();
-    if (keyBytes != null) {
-      Encryption.Context cryptoContext = Encryption.newContext(conf);
-      Key key;
-      String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
-        User.getCurrent().getShortName());
-      try {
-        // First try the master key
-        key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
-      } catch (KeyException e) {
-        // If the current master key fails to unwrap, try the alternate, if
-        // one is configured
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
-        }
-        String alternateKeyName =
-          conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
-        if (alternateKeyName != null) {
-          try {
-            key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
-          } catch (KeyException ex) {
-            throw new IOException(ex);
-          }
-        } else {
-          throw new IOException(e);
-        }
-      }
-      // Use the algorithm the key wants
-      Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
-      if (cipher == null) {
-        throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
-      }
-      cryptoContext.setCipher(cipher);
-      cryptoContext.setKey(key);
-      builder.withEncryptionContext(cryptoContext);
-    }
-
-    HFileContext context = builder.build();
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Reader" + (path != null ? " for " + path : "" ) +
-        " initialized with cacheConf: " + cacheConf +
-        " comparator: " + comparator.getClass().getSimpleName() +
-        " fileContext: " + context);
-    }
-
-    return context;
-  }
-
-  /**
-   * Create a Scanner on this file. No seeks or reads are done on creation. Call
-   * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is
-   * nothing to clean up in a Scanner. Letting go of your references to the
-   * scanner is sufficient. NOTE: Do not use this overload of getScanner for
-   * compactions. See {@link #getScanner(boolean, boolean, boolean)}
-   *
-   * @param cacheBlocks True if we should cache blocks read in by this scanner.
-   * @param pread Use positional read rather than seek+read if true (pread is
-   *          better for random reads, seek+read is better scanning).
-   * @return Scanner on this file.
-   */
-  @Override
-  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
-    return getScanner(cacheBlocks, pread, false);
-  }
-
-  /**
-   * Create a Scanner on this file. No seeks or reads are done on creation. Call
-   * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is
-   * nothing to clean up in a Scanner. Letting go of your references to the
-   * scanner is sufficient.
-   * @param cacheBlocks
-   *          True if we should cache blocks read in by this scanner.
-   * @param pread
-   *          Use positional read rather than seek+read if true (pread is better
-   *          for random reads, seek+read is better scanning).
-   * @param isCompaction
-   *          is scanner being used for a compaction?
-   * @return Scanner on this file.
-   */
-  @Override
-  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
-      final boolean isCompaction) {
-    if (dataBlockEncoder.useEncodedScanner()) {
-      return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext);
-    }
-    return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction);
-  }
-
-  public int getMajorVersion() {
-    return 3;
-  }
-}


[4/4] hbase git commit: Revert "HBASE-13373 Squash HFileReaderV3 together with HFileReaderV2 and AbstractHFileReader; ditto for Scanners and BlockReader, etc.:" Revert because breaking migration tests.

Posted by st...@apache.org.
Revert "HBASE-13373 Squash HFileReaderV3 together with HFileReaderV2 and AbstractHFileReader; ditto for Scanners and BlockReader, etc.:"
Revert because breaking migration tests.

This reverts commit 95893ffebd2928356a73ae536f081aa080230c72.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e65f4300
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e65f4300
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e65f4300

Branch: refs/heads/branch-1
Commit: e65f430003aa86d8a138b631bd9f3af86c826461
Parents: 5c19f9e
Author: stack <st...@apache.org>
Authored: Fri Apr 3 18:56:22 2015 -0700
Committer: stack <st...@apache.org>
Committed: Fri Apr 3 18:57:53 2015 -0700

----------------------------------------------------------------------
 .../IntegrationTestIngestWithEncryption.java    |    8 +-
 .../hbase/io/hfile/AbstractHFileReader.java     |  352 ++++
 .../hbase/io/hfile/AbstractHFileWriter.java     |  266 +++
 .../hadoop/hbase/io/hfile/FixedFileTrailer.java |    6 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java |   42 +-
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  110 +-
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java  |   12 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  | 1688 ------------------
 .../hadoop/hbase/io/hfile/HFileReaderV2.java    | 1323 ++++++++++++++
 .../hadoop/hbase/io/hfile/HFileReaderV3.java    |  358 ++++
 .../hbase/io/hfile/HFileWriterFactory.java      |   40 -
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java  |  641 -------
 .../hadoop/hbase/io/hfile/HFileWriterV2.java    |  424 +++++
 .../hadoop/hbase/io/hfile/HFileWriterV3.java    |  136 ++
 .../hbase/mapreduce/HFileOutputFormat2.java     |   13 +-
 .../hbase/regionserver/HRegionServer.java       |    2 -
 .../hadoop/hbase/regionserver/Region.java       |    2 +-
 .../hadoop/hbase/regionserver/StoreFile.java    |    3 +-
 .../regionserver/compactions/Compactor.java     |    4 +-
 .../hadoop/hbase/util/CompressionTest.java      |    4 +-
 .../hbase/HFilePerformanceEvaluation.java       |    4 +-
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java |   18 +-
 .../hbase/io/hfile/TestFixedFileTrailer.java    |   18 +-
 .../io/hfile/TestForceCacheImportantBlocks.java |    2 +
 .../apache/hadoop/hbase/io/hfile/TestHFile.java |    4 +-
 .../hbase/io/hfile/TestHFileBlockIndex.java     |    2 +-
 .../TestHFileInlineToRootChunkConversion.java   |    7 +-
 .../hadoop/hbase/io/hfile/TestHFileSeek.java    |    2 +-
 .../hbase/io/hfile/TestHFileWriterV2.java       |    9 +-
 .../hbase/io/hfile/TestHFileWriterV3.java       |    9 +-
 .../hfile/TestLazyDataBlockDecompression.java   |    5 +-
 .../hadoop/hbase/io/hfile/TestPrefetch.java     |    6 +-
 .../hadoop/hbase/io/hfile/TestReseekTo.java     |    2 +-
 .../hadoop/hbase/io/hfile/TestSeekTo.java       |   15 +-
 .../regionserver/DataBlockEncodingTool.java     |    7 +-
 .../regionserver/TestCacheOnWriteInSchema.java  |    3 +-
 36 files changed, 3023 insertions(+), 2524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
index 6464aad..dbaf5b8 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
@@ -24,9 +24,9 @@ import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileReaderV3;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV3;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
-import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
@@ -46,8 +46,8 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
   static {
     // These log level changes are only useful when running on a localhost
     // cluster.
-    Logger.getLogger(HFileReaderImpl.class).setLevel(Level.TRACE);
-    Logger.getLogger(HFileWriterImpl.class).setLevel(Level.TRACE);
+    Logger.getLogger(HFileReaderV3.class).setLevel(Level.TRACE);
+    Logger.getLogger(HFileWriterV3.class).setLevel(Level.TRACE);
     Logger.getLogger(SecureProtobufLogReader.class).setLevel(Level.TRACE);
     Logger.getLogger(SecureProtobufLogWriter.class).setLevel(Level.TRACE);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
new file mode 100644
index 0000000..8c1e7b9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
@@ -0,0 +1,352 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+
+/**
+ * Common functionality needed by all versions of {@link HFile} readers.
+ */
+@InterfaceAudience.Private
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
+public abstract class AbstractHFileReader
+    implements HFile.Reader, Configurable {
+  /** Stream to read from. Does checksum verifications in file system */
+  protected FSDataInputStream istream; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
+
+  /** The file system stream of the underlying {@link HFile} that
+   * does not do checksum verification in the file system */
+  protected FSDataInputStream istreamNoFsChecksum;  // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
+
+  /** Data block index reader keeping the root data index in memory */
+  protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
+
+  /** Meta block index reader -- always single level */
+  protected HFileBlockIndex.BlockIndexReader metaBlockIndexReader;
+
+  protected final FixedFileTrailer trailer;
+
+  /** Filled when we read in the trailer. */
+  protected final Compression.Algorithm compressAlgo;
+
+  /**
+   * What kind of data block encoding should be used while reading, writing,
+   * and handling cache.
+   */
+  protected HFileDataBlockEncoder dataBlockEncoder =
+      NoOpDataBlockEncoder.INSTANCE;
+
+  /** Last key in the file. Filled in when we read in the file info */
+  protected byte [] lastKey = null;
+
+  /** Average key length read from file info */
+  protected int avgKeyLen = -1;
+
+  /** Average value length read from file info */
+  protected int avgValueLen = -1;
+
+  /** Key comparator */
+  protected KVComparator comparator = new KVComparator();
+
+  /** Size of this file. */
+  protected final long fileSize;
+
+  /** Block cache configuration. */
+  protected final CacheConfig cacheConf;
+
+  /** Path of file */
+  protected final Path path;
+
+  /** File name to be used for block names */
+  protected final String name;
+
+  protected FileInfo fileInfo;
+
+  /** The filesystem used for accesing data */
+  protected HFileSystem hfs;
+
+  protected Configuration conf;
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
+  protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
+      final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs,
+      final Configuration conf) {
+    this.trailer = trailer;
+    this.compressAlgo = trailer.getCompressionCodec();
+    this.cacheConf = cacheConf;
+    this.fileSize = fileSize;
+    this.path = path;
+    this.name = path.getName();
+    this.hfs = hfs; // URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD
+    this.conf = conf;
+  }
+
+  @SuppressWarnings("serial")
+  public static class BlockIndexNotLoadedException
+      extends IllegalStateException {
+    public BlockIndexNotLoadedException() {
+      // Add a message in case anyone relies on it as opposed to class name.
+      super("Block index not loaded");
+    }
+  }
+
+  protected String toStringFirstKey() {
+    return KeyValue.keyToString(getFirstKey());
+  }
+
+  protected String toStringLastKey() {
+    return KeyValue.keyToString(getLastKey());
+  }
+
+  public abstract boolean isFileInfoLoaded();
+
+  @Override
+  public String toString() {
+    return "reader=" + path.toString() +
+        (!isFileInfoLoaded()? "":
+          ", compression=" + compressAlgo.getName() +
+          ", cacheConf=" + cacheConf +
+          ", firstKey=" + toStringFirstKey() +
+          ", lastKey=" + toStringLastKey()) +
+          ", avgKeyLen=" + avgKeyLen +
+          ", avgValueLen=" + avgValueLen +
+          ", entries=" + trailer.getEntryCount() +
+          ", length=" + fileSize;
+  }
+
+  @Override
+  public long length() {
+    return fileSize;
+  }
+
+  /**
+   * Create a Scanner on this file. No seeks or reads are done on creation. Call
+   * {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
+   * nothing to clean up in a Scanner. Letting go of your references to the
+   * scanner is sufficient. NOTE: Do not use this overload of getScanner for
+   * compactions.
+   *
+   * @param cacheBlocks True if we should cache blocks read in by this scanner.
+   * @param pread Use positional read rather than seek+read if true (pread is
+   *          better for random reads, seek+read is better scanning).
+   * @return Scanner on this file.
+   */
+  @Override
+  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
+    return getScanner(cacheBlocks, pread, false);
+  }
+
+  /**
+   * @return the first key in the file. May be null if file has no entries. Note
+   *         that this is not the first row key, but rather the byte form of the
+   *         first KeyValue.
+   */
+  @Override
+  public byte [] getFirstKey() {
+    if (dataBlockIndexReader == null) {
+      throw new BlockIndexNotLoadedException();
+    }
+    return dataBlockIndexReader.isEmpty() ? null
+        : dataBlockIndexReader.getRootBlockKey(0);
+  }
+
+  /**
+   * TODO left from {@link HFile} version 1: move this to StoreFile after Ryan's
+   * patch goes in to eliminate {@link KeyValue} here.
+   *
+   * @return the first row key, or null if the file is empty.
+   */
+  @Override
+  public byte[] getFirstRowKey() {
+    byte[] firstKey = getFirstKey();
+    if (firstKey == null)
+      return null;
+    return KeyValue.createKeyValueFromKey(firstKey).getRow();
+  }
+
+  /**
+   * TODO left from {@link HFile} version 1: move this to StoreFile after
+   * Ryan's patch goes in to eliminate {@link KeyValue} here.
+   *
+   * @return the last row key, or null if the file is empty.
+   */
+  @Override
+  public byte[] getLastRowKey() {
+    byte[] lastKey = getLastKey();
+    if (lastKey == null)
+      return null;
+    return KeyValue.createKeyValueFromKey(lastKey).getRow();
+  }
+
+  /** @return number of KV entries in this HFile */
+  @Override
+  public long getEntries() {
+    return trailer.getEntryCount();
+  }
+
+  /** @return comparator */
+  @Override
+  public KVComparator getComparator() {
+    return comparator;
+  }
+
+  /** @return compression algorithm */
+  @Override
+  public Compression.Algorithm getCompressionAlgorithm() {
+    return compressAlgo;
+  }
+
+  /**
+   * @return the total heap size of data and meta block indexes in bytes. Does
+   *         not take into account non-root blocks of a multilevel data index.
+   */
+  public long indexSize() {
+    return (dataBlockIndexReader != null ? dataBlockIndexReader.heapSize() : 0)
+        + ((metaBlockIndexReader != null) ? metaBlockIndexReader.heapSize()
+            : 0);
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public HFileBlockIndex.BlockIndexReader getDataBlockIndexReader() {
+    return dataBlockIndexReader;
+  }
+
+  @Override
+  public FixedFileTrailer getTrailer() {
+    return trailer;
+  }
+
+  @Override
+  public FileInfo loadFileInfo() throws IOException {
+    return fileInfo;
+  }
+
+  /**
+   * An exception thrown when an operation requiring a scanner to be seeked
+   * is invoked on a scanner that is not seeked.
+   */
+  @SuppressWarnings("serial")
+  public static class NotSeekedException extends IllegalStateException {
+    public NotSeekedException() {
+      super("Not seeked to a key/value");
+    }
+  }
+
+  protected static abstract class Scanner implements HFileScanner {
+    protected ByteBuffer blockBuffer;
+
+    protected boolean cacheBlocks;
+    protected final boolean pread;
+    protected final boolean isCompaction;
+
+    protected int currKeyLen;
+    protected int currValueLen;
+    protected int currMemstoreTSLen;
+    protected long currMemstoreTS;
+
+    protected int blockFetches;
+
+    protected final HFile.Reader reader;
+
+    public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      this.reader = reader;
+      this.cacheBlocks = cacheBlocks;
+      this.pread = pread;
+      this.isCompaction = isCompaction;
+    }
+
+    @Override
+    public boolean isSeeked(){
+      return blockBuffer != null;
+    }
+
+    @Override
+    public String toString() {
+      return "HFileScanner for reader " + String.valueOf(getReader());
+    }
+
+    protected void assertSeeked() {
+      if (!isSeeked())
+        throw new NotSeekedException();
+    }
+
+    @Override
+    public int seekTo(byte[] key) throws IOException {
+      return seekTo(key, 0, key.length);
+    }
+    
+    @Override
+    public boolean seekBefore(byte[] key) throws IOException {
+      return seekBefore(key, 0, key.length);
+    }
+    
+    @Override
+    public int reseekTo(byte[] key) throws IOException {
+      return reseekTo(key, 0, key.length);
+    }
+
+    @Override
+    public HFile.Reader getReader() {
+      return reader;
+    }
+  }
+
+  /** For testing */
+  abstract HFileBlock.FSReader getUncachedBlockReader();
+
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public DataBlockEncoding getDataBlockEncoding() {
+    return dataBlockEncoder.getDataBlockEncoding();
+  }
+
+  public abstract int getMajorVersion();
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
new file mode 100644
index 0000000..52491e6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Common functionality needed by all versions of {@link HFile} writers.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractHFileWriter implements HFile.Writer {
+
+  /** The Cell previously appended. Becomes the last cell in the file.*/
+  protected Cell lastCell = null;
+
+  /** FileSystem stream to write into. */
+  protected FSDataOutputStream outputStream;
+
+  /** True if we opened the <code>outputStream</code> (and so will close it). */
+  protected final boolean closeOutputStream;
+
+  /** A "file info" block: a key-value map of file-wide metadata. */
+  protected FileInfo fileInfo = new HFile.FileInfo();
+
+  /** Total # of key/value entries, i.e. how many times add() was called. */
+  protected long entryCount = 0;
+
+  /** Used for calculating the average key length. */
+  protected long totalKeyLength = 0;
+
+  /** Used for calculating the average value length. */
+  protected long totalValueLength = 0;
+
+  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
+  protected long totalUncompressedBytes = 0;
+
+  /** Key comparator. Used to ensure we write in order. */
+  protected final KVComparator comparator;
+
+  /** Meta block names. */
+  protected List<byte[]> metaNames = new ArrayList<byte[]>();
+
+  /** {@link Writable}s representing meta block data. */
+  protected List<Writable> metaData = new ArrayList<Writable>();
+
+  /**
+   * First cell in a block.
+   * This reference should be short-lived since we write hfiles in a burst.
+   */
+  protected Cell firstCellInBlock = null;
+
+  /** May be null if we were passed a stream. */
+  protected final Path path;
+
+
+  /** Cache configuration for caching data on write. */
+  protected final CacheConfig cacheConf;
+
+  /**
+   * Name for this object used when logging or in toString. Is either
+   * the result of a toString on stream or else name of passed file Path.
+   */
+  protected final String name;
+
+  /**
+   * The data block encoding which will be used.
+   * {@link NoOpDataBlockEncoder#INSTANCE} if there is no encoding.
+   */
+  protected final HFileDataBlockEncoder blockEncoder;
+  
+  protected final HFileContext hFileContext;
+
+  public AbstractHFileWriter(CacheConfig cacheConf,
+      FSDataOutputStream outputStream, Path path, 
+      KVComparator comparator, HFileContext fileContext) {
+    this.outputStream = outputStream;
+    this.path = path;
+    this.name = path != null ? path.getName() : outputStream.toString();
+    this.hFileContext = fileContext;
+    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
+    if (encoding != DataBlockEncoding.NONE) {
+      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
+    } else {
+      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
+    }
+    this.comparator = comparator != null ? comparator
+        : KeyValue.COMPARATOR;
+
+    closeOutputStream = path != null;
+    this.cacheConf = cacheConf;
+  }
+
+  /**
+   * Add last bits of metadata to file info before it is written out.
+   */
+  protected void finishFileInfo() throws IOException {
+    if (lastCell != null) {
+      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
+      // byte buffer. Won't take a tuple.
+      byte [] lastKey = CellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
+      fileInfo.append(FileInfo.LASTKEY, lastKey, false);
+    }
+
+    // Average key length.
+    int avgKeyLen =
+        entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
+    fileInfo.append(FileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
+
+    // Average value length.
+    int avgValueLen =
+        entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
+    fileInfo.append(FileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
+
+    fileInfo.append(FileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
+      false);
+  }
+
+  /**
+   * Add to the file info. All added key/value pairs can be obtained using
+   * {@link HFile.Reader#loadFileInfo()}.
+   *
+   * @param k Key
+   * @param v Value
+   * @throws IOException in case the key or the value are invalid
+   */
+  @Override
+  public void appendFileInfo(final byte[] k, final byte[] v)
+      throws IOException {
+    fileInfo.append(k, v, true);
+  }
+
+  /**
+   * Sets the file info offset in the trailer, finishes up populating fields in
+   * the file info, and writes the file info into the given data output. The
+   * reason the data output is not always {@link #outputStream} is that we store
+   * file info as a block in version 2.
+   *
+   * @param trailer fixed file trailer
+   * @param out the data output to write the file info to
+   * @throws IOException
+   */
+  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
+  throws IOException {
+    trailer.setFileInfoOffset(outputStream.getPos());
+    finishFileInfo();
+    fileInfo.write(out);
+  }
+
+  /**
+   * Checks that the given Cell's key does not violate the key order.
+   *
+   * @param cell Cell whose key to check.
+   * @return true if the key is duplicate
+   * @throws IOException if the key or the key order is wrong
+   */
+  protected boolean checkKey(final Cell cell) throws IOException {
+    boolean isDuplicateKey = false;
+
+    if (cell == null) {
+      throw new IOException("Key cannot be null or empty");
+    }
+    if (lastCell != null) {
+      int keyComp = comparator.compareOnlyKeyPortion(lastCell, cell);
+
+      if (keyComp > 0) {
+        throw new IOException("Added a key not lexically larger than"
+            + " previous. Current cell = " + cell + ", lastCell = " + lastCell);
+      } else if (keyComp == 0) {
+        isDuplicateKey = true;
+      }
+    }
+    return isDuplicateKey;
+  }
+
+  /** Checks the given value for validity. */
+  protected void checkValue(final byte[] value, final int offset,
+      final int length) throws IOException {
+    if (value == null) {
+      throw new IOException("Value cannot be null");
+    }
+  }
+
+  /**
+   * @return Path or null if we were passed a stream rather than a Path.
+   */
+  @Override
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public String toString() {
+    return "writer=" + (path != null ? path.toString() : null) + ", name="
+        + name + ", compression=" + hFileContext.getCompression().getName();
+  }
+
+  /**
+   * Sets remaining trailer fields, writes the trailer to disk, and optionally
+   * closes the output stream.
+   */
+  protected void finishClose(FixedFileTrailer trailer) throws IOException {
+    trailer.setMetaIndexCount(metaNames.size());
+    trailer.setTotalUncompressedBytes(totalUncompressedBytes+ trailer.getTrailerSize());
+    trailer.setEntryCount(entryCount);
+    trailer.setCompressionCodec(hFileContext.getCompression());
+
+    trailer.serialize(outputStream);
+
+    if (closeOutputStream) {
+      outputStream.close();
+      outputStream = null;
+    }
+  }
+
+  public static Compression.Algorithm compressionByName(String algoName) {
+    if (algoName == null)
+      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
+    return Compression.getCompressionAlgorithmByName(algoName);
+  }
+
+  /** A helper method to create HFile output streams in constructors */
+  protected static FSDataOutputStream createOutputStream(Configuration conf,
+      FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException {
+    FsPermission perms = FSUtils.getFilePermissions(fs, conf,
+        HConstants.DATA_FILE_UMASK_KEY);
+    return FSUtils.create(fs, path, perms, favoredNodes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
index 3dcfc9b..56510f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
@@ -238,7 +238,7 @@ public class FixedFileTrailer {
     BlockType.TRAILER.readAndCheck(inputStream);
 
     if (majorVersion > 2
-        || (majorVersion == 2 && minorVersion >= HFileReaderImpl.PBUF_TRAILER_MINOR_VERSION)) {
+        || (majorVersion == 2 && minorVersion >= HFileReaderV2.PBUF_TRAILER_MINOR_VERSION)) {
       deserializeFromPB(inputStream);
     } else {
       deserializeFromWritable(inputStream);
@@ -611,9 +611,7 @@ public class FixedFileTrailer {
   }
 
   public byte[] getEncryptionKey() {
-    // This is a v3 feature but if reading a v2 file the encryptionKey will just be null which
-    // if fine for this feature.
-    expectAtLeastMajorVersion(2);
+    expectAtLeastMajorVersion(3);
     return encryptionKey;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index f7dff43..7e658e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.Writable;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -197,8 +196,6 @@ public class HFile {
 
   /** API required to write an {@link HFile} */
   public interface Writer extends Closeable {
-    /** Max memstore (mvcc) timestamp in FileInfo */
-    public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
 
     /** Add an element to the file info map. */
     void appendFileInfo(byte[] key, byte[] value) throws IOException;
@@ -296,7 +293,7 @@ public class HFile {
             "filesystem/path or path");
       }
       if (path != null) {
-        ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes);
+        ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
       }
       return createWriter(fs, path, ostream,
                    comparator, fileContext);
@@ -335,12 +332,9 @@ public class HFile {
     int version = getFormatVersion(conf);
     switch (version) {
     case 2:
-      throw new IllegalArgumentException("This should never happen. " +
-        "Did you change hfile.format.version to read v2? This version of the software writes v3" +
-        " hfiles only (but it can read v2 files without having to update hfile.format.version " +
-        "in hbase-site.xml)");
+      return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
     case 3:
-      return new HFileWriterFactory(conf, cacheConf);
+      return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
     default:
       throw new IllegalArgumentException("Cannot create writer for HFile " +
           "format version " + version);
@@ -445,18 +439,6 @@ public class HFile {
      * Return the file context of the HFile this reader belongs to
      */
     HFileContext getFileContext();
-
-    boolean shouldIncludeMemstoreTS();
-
-    boolean isDecodeMemstoreTS();
-
-    DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
-
-    @VisibleForTesting
-    HFileBlock.FSReader getUncachedBlockReader();
-
-    @VisibleForTesting
-    boolean prefetchComplete();
   }
 
   /**
@@ -480,10 +462,9 @@ public class HFile {
       trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
       switch (trailer.getMajorVersion()) {
       case 2:
-        LOG.debug("Opening HFile v2 with v3 reader");
-        // Fall through.
+        return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf);
       case 3 :
-        return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf);
+        return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf);
       default:
         throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
       }
@@ -507,7 +488,6 @@ public class HFile {
    * @return A version specific Hfile Reader
    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
    */
-  @SuppressWarnings("resource")
   public static Reader createReader(FileSystem fs, Path path,
       FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
       throws IOException {
@@ -873,18 +853,6 @@ public class HFile {
     }
   }
 
-
-  public static void checkHFileVersion(final Configuration c) {
-    int version = c.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
-    if (version < MAX_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
-      throw new IllegalArgumentException("The setting for " + FORMAT_VERSION_KEY +
-        " (in your hbase-*.xml files) is " + version + " which does not match " +
-        MAX_FORMAT_VERSION +
-        "; are you running with a configuration from an older or newer hbase install (an " +
-        "incompatible hbase-default.xml or hbase-site.xml on your CLASSPATH)?");
-    }
-  }
-
   public static void main(String[] args) throws Exception {
     // delegate to preserve old behavior
     HFilePrettyPrinter.main(args);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 022e579..b8303b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1257,40 +1257,13 @@ public class HFileBlock implements Cacheable {
 
     /** Get the default decoder for blocks from this file. */
     HFileBlockDecodingContext getDefaultBlockDecodingContext();
-
-    void setIncludesMemstoreTS(boolean includesMemstoreTS);
-    void setDataBlockEncoder(HFileDataBlockEncoder encoder);
   }
 
   /**
-   * We always prefetch the header of the next block, so that we know its
-   * on-disk size in advance and can read it in one operation.
+   * A common implementation of some methods of {@link FSReader} and some
+   * tools for implementing HFile format version-specific block readers.
    */
-  private static class PrefetchedHeader {
-    long offset = -1;
-    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
-    final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
-  }
-
-  /** Reads version 2 blocks from the filesystem. */
-  static class FSReaderImpl implements FSReader {
-    /** The file system stream of the underlying {@link HFile} that
-     * does or doesn't do checksum validations in the filesystem */
-    protected FSDataInputStreamWrapper streamWrapper;
-
-    private HFileBlockDecodingContext encodedBlockDecodingCtx;
-
-    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
-    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
-
-    private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
-        new ThreadLocal<PrefetchedHeader>() {
-      @Override
-      public PrefetchedHeader initialValue() {
-        return new PrefetchedHeader();
-      }
-    };
-
+  private abstract static class AbstractFSReader implements FSReader {
     /** Compression algorithm used by the {@link HFile} */
 
     /** The size of the file we are reading from, or -1 if unknown. */
@@ -1312,31 +1285,18 @@ public class HFileBlock implements Cacheable {
 
     protected HFileContext fileContext;
 
-    public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
-        HFileContext fileContext) throws IOException {
+    public AbstractFSReader(long fileSize, HFileSystem hfs, Path path, HFileContext fileContext)
+        throws IOException {
       this.fileSize = fileSize;
       this.hfs = hfs;
       this.path = path;
       this.fileContext = fileContext;
       this.hdrSize = headerSize(fileContext.isUseHBaseChecksum());
-
-      this.streamWrapper = stream;
-      // Older versions of HBase didn't support checksum.
-      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
-      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
-      encodedBlockDecodingCtx = defaultDecodingCtx;
     }
 
-    /**
-     * A constructor that reads files with the latest minor version.
-     * This is used by unit tests only.
-     */
-    FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
-    throws IOException {
-      this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
-    }
-
-    public BlockIterator blockRange(final long startOffset, final long endOffset) {
+    @Override
+    public BlockIterator blockRange(final long startOffset,
+        final long endOffset) {
       final FSReader owner = this; // handle for inner class
       return new BlockIterator() {
         private long offset = startOffset;
@@ -1433,6 +1393,56 @@ public class HFileBlock implements Cacheable {
       return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
     }
 
+  }
+
+  /**
+   * We always prefetch the header of the next block, so that we know its
+   * on-disk size in advance and can read it in one operation.
+   */
+  private static class PrefetchedHeader {
+    long offset = -1;
+    byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
+    final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE);
+  }
+
+  /** Reads version 2 blocks from the filesystem. */
+  static class FSReaderImpl extends AbstractFSReader {
+    /** The file system stream of the underlying {@link HFile} that
+     * does or doesn't do checksum validations in the filesystem */
+    protected FSDataInputStreamWrapper streamWrapper;
+
+    private HFileBlockDecodingContext encodedBlockDecodingCtx;
+
+    /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */
+    private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
+
+    private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
+        new ThreadLocal<PrefetchedHeader>() {
+          @Override
+          public PrefetchedHeader initialValue() {
+            return new PrefetchedHeader();
+          }
+        };
+
+    public FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
+        HFileContext fileContext) throws IOException {
+      super(fileSize, hfs, path, fileContext);
+      this.streamWrapper = stream;
+      // Older versions of HBase didn't support checksum.
+      this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
+      defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
+      encodedBlockDecodingCtx = defaultDecodingCtx;
+    }
+
+    /**
+     * A constructor that reads files with the latest minor version.
+     * This is used by unit tests only.
+     */
+    FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext)
+    throws IOException {
+      this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
+    }
+
     /**
      * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
      * little memory allocation as possible, using the provided on-disk size.
@@ -1673,11 +1683,11 @@ public class HFileBlock implements Cacheable {
       return b;
     }
 
-    public void setIncludesMemstoreTS(boolean includesMemstoreTS) {
+    void setIncludesMemstoreTS(boolean includesMemstoreTS) {
       this.fileContext.setIncludesMvcc(includesMemstoreTS);
     }
 
-    public void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
+    void setDataBlockEncoder(HFileDataBlockEncoder encoder) {
       encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e65f4300/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 9132d2f..d656cae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -54,9 +54,9 @@ import org.apache.hadoop.util.StringUtils;
  * ({@link BlockIndexReader}) single-level and multi-level block indexes.
  *
  * Examples of how to use the block index writer can be found in
- * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and
- *  {@link HFileWriterImpl}. Examples of how to use the reader can be
- *  found in {@link HFileWriterImpl} and TestHFileBlockIndex.
+ * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} 
+ * and {@link HFileWriterV2}. Examples of how to use the reader can be 
+ * found in {@link HFileReaderV2} and TestHFileBlockIndex.
  */
 @InterfaceAudience.Private
 public class HFileBlockIndex {
@@ -193,7 +193,7 @@ public class HFileBlockIndex {
      * Return the BlockWithScanInfo which contains the DataBlock with other scan
      * info such as nextIndexedKey. This function will only be called when the
      * HFile version is larger than 1.
-     *
+     * 
      * @param key
      *          the key we are looking for
      * @param currentBlock
@@ -494,7 +494,7 @@ public class HFileBlockIndex {
      * Performs a binary search over a non-root level index block. Utilizes the
      * secondary index, which records the offsets of (offset, onDiskSize,
      * firstKey) tuples of all entries.
-     *
+     * 
      * @param key
      *          the key we are searching for offsets to individual entries in
      *          the blockIndex buffer
@@ -641,7 +641,7 @@ public class HFileBlockIndex {
         }
       }
     }
-
+    
     /**
      * Read in the root-level index from the given input stream. Must match
      * what was written into the root level by