You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/12/24 22:20:41 UTC

svn commit: r1223020 [3/5] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Sat Dec 24 21:20:39 2011
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
+import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
+
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -31,11 +34,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 
-import org.apache.hadoop.hbase.io.DoubleOutputStream;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.CompoundBloomFilter;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.IOUtils;
@@ -45,9 +49,6 @@ import org.apache.hadoop.io.compress.Dec
 
 import com.google.common.base.Preconditions;
 
-import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
-import static org.apache.hadoop.hbase.io.hfile.Compression.Algorithm.NONE;
-
 /**
  * Reading {@link HFile} version 1 and 2 blocks, and writing version 2 blocks.
  * <ul>
@@ -75,10 +76,26 @@ import static org.apache.hadoop.hbase.io
  */
 public class HFileBlock extends SchemaConfigured implements Cacheable {
 
+  public static final boolean FILL_HEADER = true;
+  public static final boolean DONT_FILL_HEADER = false;
+
   /** The size of a version 2 {@link HFile} block header */
   public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
       + Bytes.SIZEOF_LONG;
 
+  /**
+   * We store a two-byte encoder ID at the beginning of every encoded data
+   * block payload (immediately after the block header).
+   */
+  public static final int DATA_BLOCK_ENCODER_ID_SIZE = Bytes.SIZEOF_SHORT;
+
+  /**
+   * The size of block header when blockType is {@link BlockType#ENCODED_DATA}.
+   * This extends normal header by adding the id of encoder.
+   */
+  public static final int ENCODED_HEADER_SIZE = HEADER_SIZE
+      + DATA_BLOCK_ENCODER_ID_SIZE;
+
   /** Just an array of bytes of the right size. */
   public static final byte[] DUMMY_HEADER = new byte[HEADER_SIZE];
 
@@ -107,10 +124,11 @@ public class HFileBlock extends SchemaCo
       };
 
   private BlockType blockType;
-  private final int onDiskSizeWithoutHeader;
+  private int onDiskSizeWithoutHeader;
   private final int uncompressedSizeWithoutHeader;
   private final long prevBlockOffset;
   private ByteBuffer buf;
+  private boolean includesMemstoreTS;
 
   /**
    * The offset of this block in the file. Populated by the reader for
@@ -146,7 +164,7 @@ public class HFileBlock extends SchemaCo
    */
   public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
       int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
-      boolean fillHeader, long offset) {
+      boolean fillHeader, long offset, boolean includesMemstoreTS) {
     this.blockType = blockType;
     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
     this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
@@ -155,6 +173,7 @@ public class HFileBlock extends SchemaCo
     if (fillHeader)
       overwriteHeader();
     this.offset = offset;
+    this.includesMemstoreTS = includesMemstoreTS;
   }
 
   /**
@@ -177,6 +196,15 @@ public class HFileBlock extends SchemaCo
     return blockType;
   }
 
+  /** @return get data block encoding id that was used to encode this block */
+  public short getDataBlockEncodingId() {
+    if (blockType != BlockType.ENCODED_DATA) {
+      throw new IllegalArgumentException("Querying encoder ID of a block " +
+          "of type other than " + BlockType.ENCODED_DATA + ": " + blockType);
+    }
+    return buf.getShort(HEADER_SIZE);
+  }
+
   /**
    * @return the on-disk size of the block with header size included
    */
@@ -509,29 +537,30 @@ public class HFileBlock extends SchemaCo
     /** Compression algorithm for all blocks this instance writes. */
     private final Compression.Algorithm compressAlgo;
 
-    /**
-     * The stream we use to accumulate data in the on-disk format for each
-     * block (i.e. compressed data, or uncompressed if using no compression).
-     * We reset this stream at the end of each block and reuse it. The header
-     * is written as the first {@link #HEADER_SIZE} bytes into this stream.
-     */
-    private ByteArrayOutputStream baosOnDisk;
+    /** Data block encoder used for data blocks */
+    private final HFileDataBlockEncoder dataBlockEncoder;
 
     /**
-     * The stream we use to accumulate uncompressed block data for
-     * cache-on-write. Null when cache-on-write is turned off.
+     * The stream we use to accumulate data in uncompressed format for each
+     * block. We reset this stream at the end of each block and reuse it. The
+     * header is written as the first {@link #HEADER_SIZE} bytes into this
+     * stream.
      */
     private ByteArrayOutputStream baosInMemory;
 
     /** Compressor, which is also reused between consecutive blocks. */
     private Compressor compressor;
 
-    /** Current block type. Set in {@link #startWriting(BlockType)}. */
+    /**
+     * Current block type. Set in {@link #startWriting(BlockType)}. Could be
+     * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
+     * to {@link BlockType#ENCODED_DATA}.
+     */
     private BlockType blockType;
 
     /**
      * A stream that we write uncompressed bytes to, which compresses them and
-     * writes them to {@link #baosOnDisk}.
+     * writes them to {@link #baosInMemory}.
      */
     private DataOutputStream userDataStream;
 
@@ -542,14 +571,8 @@ public class HFileBlock extends SchemaCo
     private byte[] onDiskBytesWithHeader;
 
     /**
-     * The total number of uncompressed bytes written into the current block,
-     * with header size not included. Valid in the READY state.
-     */
-    private int uncompressedSizeWithoutHeader;
-
-    /**
-     * Only used when we are using cache-on-write. Valid in the READY state.
-     * Contains the header and the uncompressed bytes, so the length is
+     * Valid in the READY state. Contains the header and the uncompressed (but
+     * potentially encoded, if this is a data block) bytes, so the length is
      * {@link #uncompressedSizeWithoutHeader} + {@link HFileBlock#HEADER_SIZE}.
      */
     private byte[] uncompressedBytesWithHeader;
@@ -566,30 +589,36 @@ public class HFileBlock extends SchemaCo
      */
     private long[] prevOffsetByType;
 
-    /**
-     * Whether we are accumulating uncompressed bytes for the purpose of
-     * caching on write.
-     */
-    private boolean cacheOnWrite;
-
     /** The offset of the previous block of the same type */
     private long prevOffset;
 
+    /** Whether we are including memstore timestamp after every key/value */
+    private boolean includesMemstoreTS;
+
     /**
-     * @param compressionAlgorithm
-     *          compression algorithm to use
+     * Unencoded data block for caching on write. Populated before encoding.
      */
-    public Writer(Compression.Algorithm compressionAlgorithm) {
-      compressAlgo = compressionAlgorithm == null ? NONE
-          : compressionAlgorithm;
+    private HFileBlock unencodedDataBlockForCaching;
 
-      baosOnDisk = new ByteArrayOutputStream();
+    /**
+     * @param compressionAlgorithm compression algorithm to use
+     * @param dataBlockEncoderAlgo data block encoding algorithm to use
+     */
+    public Writer(Compression.Algorithm compressionAlgorithm,
+          HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) {
+      compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
+      this.dataBlockEncoder = dataBlockEncoder != null
+          ? dataBlockEncoder : new NoOpDataBlockEncoder();
+
+      baosInMemory = new ByteArrayOutputStream();
       if (compressAlgo != NONE)
         compressor = compressionAlgorithm.getCompressor();
 
       prevOffsetByType = new long[BlockType.values().length];
       for (int i = 0; i < prevOffsetByType.length; ++i)
         prevOffsetByType[i] = -1;
+
+      this.includesMemstoreTS = includesMemstoreTS;
     }
 
     /**
@@ -598,44 +627,26 @@ public class HFileBlock extends SchemaCo
      * @return the stream the user can write their data into
      * @throws IOException
      */
-    public DataOutputStream startWriting(BlockType newBlockType,
-        boolean cacheOnWrite) throws IOException {
+    public DataOutputStream startWriting(BlockType newBlockType)
+        throws IOException {
       if (state == State.BLOCK_READY && startOffset != -1) {
         // We had a previous block that was written to a stream at a specific
         // offset. Save that offset as the last offset of a block of that type.
-        prevOffsetByType[blockType.ordinal()] = startOffset;
+        prevOffsetByType[blockType.getId()] = startOffset;
       }
 
-      this.cacheOnWrite = cacheOnWrite;
-
       startOffset = -1;
       blockType = newBlockType;
 
-      baosOnDisk.reset();
-      baosOnDisk.write(DUMMY_HEADER);
+      baosInMemory.reset();
+      baosInMemory.write(DUMMY_HEADER);
 
       state = State.WRITING;
-      if (compressAlgo == NONE) {
-        // We do not need a compression stream or a second uncompressed stream
-        // for cache-on-write.
-        userDataStream = new DataOutputStream(baosOnDisk);
-      } else {
-        OutputStream compressingOutputStream =
-          compressAlgo.createCompressionStream(baosOnDisk, compressor, 0);
 
-        if (cacheOnWrite) {
-          // We save uncompressed data in a cache-on-write mode.
-          if (baosInMemory == null)
-            baosInMemory = new ByteArrayOutputStream();
-          baosInMemory.reset();
-          baosInMemory.write(DUMMY_HEADER);
-          userDataStream = new DataOutputStream(new DoubleOutputStream(
-              compressingOutputStream, baosInMemory));
-        } else {
-          userDataStream = new DataOutputStream(compressingOutputStream);
-        }
-      }
+      unencodedDataBlockForCaching = null;
 
+      // We will compress it later in finishBlock()
+      userDataStream = new DataOutputStream(baosInMemory);
       return userDataStream;
     }
 
@@ -662,45 +673,125 @@ public class HFileBlock extends SchemaCo
       if (state == State.BLOCK_READY)
         return;
 
+      // This will set state to BLOCK_READY.
       finishBlock();
-      state = State.BLOCK_READY;
     }
 
     /**
      * An internal method that flushes the compressing stream (if using
      * compression), serializes the header, and takes care of the separate
-     * uncompressed stream for caching on write, if applicable. Block writer
-     * state transitions must be managed by the caller.
+     * uncompressed stream for caching on write, if applicable. Sets block
+     * write state to "block ready".
      */
     private void finishBlock() throws IOException {
       userDataStream.flush();
-      uncompressedSizeWithoutHeader = userDataStream.size();
 
-      onDiskBytesWithHeader = baosOnDisk.toByteArray();
-      prevOffset = prevOffsetByType[blockType.ordinal()];
-      putHeader(onDiskBytesWithHeader, 0);
+      // This does an array copy, so it is safe to cache this byte array.
+      uncompressedBytesWithHeader = baosInMemory.toByteArray();
+      prevOffset = prevOffsetByType[blockType.getId()];
+
+      // We need to set state before we can package the block up for
+      // cache-on-write. In a way, the block is ready, but not yet encoded or
+      // compressed.
+      state = State.BLOCK_READY;
+      encodeDataBlockForDisk();
 
-      if (cacheOnWrite && compressAlgo != NONE) {
-        uncompressedBytesWithHeader = baosInMemory.toByteArray();
+      doCompression();
+      putHeader(uncompressedBytesWithHeader, 0, onDiskBytesWithHeader.length,
+          uncompressedBytesWithHeader.length);
+
+      if (unencodedDataBlockForCaching != null) {
+        // We now know the final on-disk size, save it for caching. 
+        unencodedDataBlockForCaching.onDiskSizeWithoutHeader =
+            getOnDiskSizeWithoutHeader();
+        unencodedDataBlockForCaching.overwriteHeader();
+      }
+    }
+
+    /**
+     * Do compression if it is enabled, or re-use the uncompressed buffer if
+     * it is not. Fills in the compressed block's header if doing compression.
+     */
+    private void doCompression() throws IOException {
+      // do the compression
+      if (compressAlgo != NONE) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        baos.write(DUMMY_HEADER);
+
+        // compress the data
+        OutputStream compressingOutputStream =
+            compressAlgo.createCompressionStream(baos, compressor, 0);
+        compressingOutputStream.write(uncompressedBytesWithHeader, HEADER_SIZE,
+            uncompressedBytesWithHeader.length - HEADER_SIZE);
+
+        // finish compression stream
+        compressingOutputStream.flush();
+
+        onDiskBytesWithHeader = baos.toByteArray();
+        putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
+            uncompressedBytesWithHeader.length);
+      } else {
+        onDiskBytesWithHeader = uncompressedBytesWithHeader;
+      }
+    }
+
+    /**
+     * Encodes this block if it is a data block and encoding is turned on in
+     * {@link #dataBlockEncoder}.
+     */
+    private void encodeDataBlockForDisk() throws IOException {
+      if (blockType != BlockType.DATA) {
+        return; // skip any non-data block
+      }
 
-        if (uncompressedSizeWithoutHeader !=
+      // do data block encoding, if data block encoder is set
+      ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
+          HEADER_SIZE, uncompressedBytesWithHeader.length -
+          HEADER_SIZE).slice();
+      Pair<ByteBuffer, BlockType> encodingResult =
+          dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
+              includesMemstoreTS);
+
+      BlockType encodedBlockType = encodingResult.getSecond();
+      if (encodedBlockType == BlockType.ENCODED_DATA) {
+        // Save the unencoded block in case we need to cache it on write.
+        // We don't know the final on-disk size at this point, because
+        // compression has not been done yet, to set it to uncompressed size
+        // and override later.
+        int uncompressedSizeWithoutHeader = getUncompressedSizeWithoutHeader();
+        unencodedDataBlockForCaching = new HFileBlock(blockType,
+            uncompressedSizeWithoutHeader, // will override this later
+            uncompressedSizeWithoutHeader, prevOffset,
+            getUncompressedBufferWithHeader(), FILL_HEADER, startOffset,
+            includesMemstoreTS);
+        uncompressedBytesWithHeader = encodingResult.getFirst().array();
+        blockType = encodedBlockType;
+      } else {
+        // There is no encoding configured. Do some extra sanity-checking.
+        if (encodedBlockType != BlockType.DATA) {
+          throw new IOException("Unexpected block type coming out of data " +
+              "block encoder: " + encodedBlockType);
+        }
+        if (userDataStream.size() !=
             uncompressedBytesWithHeader.length - HEADER_SIZE) {
           throw new IOException("Uncompressed size mismatch: "
-              + uncompressedSizeWithoutHeader + " vs. "
+              + userDataStream.size() + " vs. "
               + (uncompressedBytesWithHeader.length - HEADER_SIZE));
         }
-
-        // Write the header into the beginning of the uncompressed byte array.
-        putHeader(uncompressedBytesWithHeader, 0);
       }
     }
 
-    /** Put the header into the given byte array at the given offset. */
-    private void putHeader(byte[] dest, int offset) {
+    /**
+     * Put the header into the given byte array at the given offset.
+     * @param onDiskSize size of the block on disk
+     * @param uncompressedSize size of the block after decompression (but
+     *          before optional data block decoding)
+     */
+    private void putHeader(byte[] dest, int offset, int onDiskSize,
+        int uncompressedSize) {
       offset = blockType.put(dest, offset);
-      offset = Bytes.putInt(dest, offset, onDiskBytesWithHeader.length
-          - HEADER_SIZE);
-      offset = Bytes.putInt(dest, offset, uncompressedSizeWithoutHeader);
+      offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
+      offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
       Bytes.putLong(dest, offset, prevOffset);
     }
 
@@ -793,7 +884,7 @@ public class HFileBlock extends SchemaCo
      */
     public int getUncompressedSizeWithoutHeader() {
       expectState(State.BLOCK_READY);
-      return uncompressedSizeWithoutHeader;
+      return uncompressedBytesWithHeader.length - HEADER_SIZE;
     }
 
     /**
@@ -801,7 +892,7 @@ public class HFileBlock extends SchemaCo
      */
     public int getUncompressedSizeWithHeader() {
       expectState(State.BLOCK_READY);
-      return uncompressedSizeWithoutHeader + HEADER_SIZE;
+      return uncompressedBytesWithHeader.length;
     }
 
     /** @return true if a block is being written  */
@@ -832,15 +923,6 @@ public class HFileBlock extends SchemaCo
     private byte[] getUncompressedDataWithHeader() {
       expectState(State.BLOCK_READY);
 
-      if (compressAlgo == NONE)
-        return onDiskBytesWithHeader;
-
-      if (!cacheOnWrite)
-        throw new IllegalStateException("Cache-on-write is turned off");
-
-      if (uncompressedBytesWithHeader == null)
-        throw new NullPointerException();
-
       return uncompressedBytesWithHeader;
     }
 
@@ -874,14 +956,18 @@ public class HFileBlock extends SchemaCo
      */
     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
         throws IOException {
-      bw.writeToBlock(startWriting(bw.getBlockType(), false));
+      bw.writeToBlock(startWriting(bw.getBlockType()));
       writeHeaderAndData(out);
     }
 
     public HFileBlock getBlockForCaching() {
-      return new HFileBlock(blockType, onDiskBytesWithHeader.length
-          - HEADER_SIZE, uncompressedSizeWithoutHeader, prevOffset,
-          getUncompressedBufferWithHeader(), false, startOffset);
+      if (unencodedDataBlockForCaching != null) {
+        return unencodedDataBlockForCaching;
+      }
+      return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
+          getUncompressedSizeWithoutHeader(), prevOffset,
+          getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
+          includesMemstoreTS);
     }
 
   }
@@ -963,14 +1049,18 @@ public class HFileBlock extends SchemaCo
     /** The size of the file we are reading from, or -1 if unknown. */
     protected long fileSize;
 
+    /** Data block encoding used to read from file */
+    protected HFileDataBlockEncoder dataBlockEncoder;
+
     /** The default buffer size for our buffered streams */
     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
 
     public AbstractFSReader(FSDataInputStream istream, Algorithm compressAlgo,
-        long fileSize) {
+        long fileSize, HFileDataBlockEncoder dataBlockEncoder) {
       this.istream = istream;
       this.compressAlgo = compressAlgo;
       this.fileSize = fileSize;
+      this.dataBlockEncoder = dataBlockEncoder;
     }
 
     @Override
@@ -1133,7 +1223,12 @@ public class HFileBlock extends SchemaCo
 
     public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
         long fileSize) {
-      super(istream, compressAlgo, fileSize);
+      this(istream, compressAlgo, fileSize, new NoOpDataBlockEncoder());
+    }
+
+    public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
+        long fileSize, HFileDataBlockEncoder blockEncoder) {
+      super(istream, compressAlgo, fileSize, blockEncoder);
     }
 
     /**
@@ -1156,7 +1251,8 @@ public class HFileBlock extends SchemaCo
      */
     @Override
     public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
-        int uncompressedSizeWithMagic, boolean pread) throws IOException {
+        int uncompressedSizeWithMagic, boolean pread)
+            throws IOException {
       if (uncompressedSizeWithMagic <= 0) {
         throw new IOException("Invalid uncompressedSize="
             + uncompressedSizeWithMagic + " for a version 1 block");
@@ -1214,7 +1310,8 @@ public class HFileBlock extends SchemaCo
       // to the size of the data portion of the block without the magic record,
       // since the magic record gets moved to the header.
       HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
-          uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, true, offset);
+          uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
+          offset, MemStore.NO_PERSISTENT_TS);
       return b;
     }
   }
@@ -1232,6 +1329,9 @@ public class HFileBlock extends SchemaCo
   /** Reads version 2 blocks from the filesystem. */
   public static class FSReaderV2 extends AbstractFSReader {
 
+    /** Whether we include memstore timestamp in data blocks */
+    protected boolean includesMemstoreTS;
+
     private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
         new ThreadLocal<PrefetchedHeader>() {
           @Override
@@ -1242,7 +1342,12 @@ public class HFileBlock extends SchemaCo
 
     public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
         long fileSize) {
-      super(istream, compressAlgo, fileSize);
+      this(istream, compressAlgo, fileSize, new NoOpDataBlockEncoder());
+    }
+
+    public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
+        long fileSize, HFileDataBlockEncoder dataBlockEncoder) {
+      super(istream, compressAlgo, fileSize, dataBlockEncoder);
     }
 
     /**
@@ -1438,6 +1543,13 @@ public class HFileBlock extends SchemaCo
           }
         }
       }
+
+      b.includesMemstoreTS = includesMemstoreTS;
+
+      if (b.getBlockType() == BlockType.ENCODED_DATA) {
+        b = dataBlockEncoder.afterReadFromDisk(b);
+      }
+
       b.offset = offset;
       return b;
     }
@@ -1451,6 +1563,10 @@ public class HFileBlock extends SchemaCo
           prefetchedHeader.header, 0, HEADER_SIZE);
     }
 
+    void setIncludesMemstoreTS(boolean enabled) {
+      includesMemstoreTS = enabled;
+    }
+
   }
 
   @Override
@@ -1518,5 +1634,9 @@ public class HFileBlock extends SchemaCo
     return true;
   }
 
+  public boolean doesIncludeMemstoreTS() {
+    return includesMemstoreTS;
+  }
+
 
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Sat Dec 24 21:20:39 2011
@@ -210,7 +210,8 @@ public class HFileBlockIndex {
         }
 
         // Found a data block, break the loop and check our level in the tree.
-        if (block.getBlockType().equals(BlockType.DATA)) {
+        if (block.getBlockType().equals(BlockType.DATA) ||
+            block.getBlockType().equals(BlockType.ENCODED_DATA)) {
           break;
         }
 
@@ -733,8 +734,8 @@ public class HFileBlockIndex {
       long rootLevelIndexPos = out.getPos();
 
       {
-        DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX,
-            false);
+        DataOutput blockStream =
+            blockWriter.startWriting(BlockType.ROOT_INDEX);
         rootChunk.writeRoot(blockStream);
         if (midKeyMetadata != null)
           blockStream.write(midKeyMetadata);
@@ -829,7 +830,7 @@ public class HFileBlockIndex {
         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
       long beginOffset = out.getPos();
       DataOutputStream dos = blockWriter.startWriting(
-          BlockType.INTERMEDIATE_INDEX, cacheOnWrite());
+          BlockType.INTERMEDIATE_INDEX);
       curChunk.writeNonRoot(dos);
       byte[] curFirstKey = curChunk.getBlockKey(0);
       blockWriter.writeHeaderAndData(out);

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Controls what kind of data block encoding is used. If data block encoding is
+ * not set, methods should just return unmodified block. All of the methods do
+ * something meaningful if BlockType is DATA_BLOCK or ENCODED_DATA. Otherwise
+ * they just return the unmodified block.
+ * <p>
+ * Read path: [parsed from disk] -> {@link #afterReadFromDisk(HFileBlock)} ->
+ * [caching] ->
+ * {@link #afterReadFromDiskAndPuttingInCache(HFileBlock, boolean)} -> [used
+ * somewhere]
+ * <p>
+ * where [caching] looks:
+ * <pre>
+ * ------------------------------------>
+ *   \----> {@link #beforeBlockCache(HFileBlock)}
+ * </pre>
+ * <p>
+ * Write path: [sorted KeyValues have been created] ->
+ * {@link #beforeWriteToDisk(ByteBuffer)} -> [(optional) compress] -> [write to
+ * disk]
+ * <p>
+ * Reading from cache path: [get from cache] ->
+ * {@link #afterBlockCache(HFileBlock, boolean)}
+ * <p>
+ * Storing data in file info: {@link #saveMetadata(StoreFile.Writer)}
+ * <p>
+ * Creating algorithm specific Scanner: {@link #useEncodedScanner()}
+ */
+public interface HFileDataBlockEncoder {
+  /**
+   * Should be called after each HFileBlock of type DATA_BLOCK or
+   * ENCODED_DATA_BLOCK is read from disk, but before it is put into the cache.
+   * @param block Block read from HFile stored on disk.
+   * @return non null block which is coded according to the settings.
+   */
+  public HFileBlock afterReadFromDisk(HFileBlock block);
+
+  /**
+   * Should be called after each HFileBlock of type DATA_BLOCK or
+   * ENCODED_DATA_BLOCK is read from disk and after it is saved in cache
+   * @param block Block read from HFile stored on disk.
+   * @param isCompaction Will block be used for compaction.
+   * @return non null block which is coded according to the settings.
+   */
+  public HFileBlock afterReadFromDiskAndPuttingInCache(HFileBlock block,
+      boolean isCompaction, boolean includesMemsoreTS);
+
+  /**
+   * Should be called before an encoded or unencoded data block is written to
+   * disk.
+   * @param in KeyValues next to each other
+   * @return a non-null on-heap buffer containing the contents of the
+   *         HFileBlock with unfilled header and block type
+   */
+  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(
+      ByteBuffer in, boolean includesMemstoreTS);
+
+  /**
+   * Should always be called before putting a block into cache.
+   * @param block block that needs to be put into cache.
+   * @return the block to put into cache instead (possibly the same)
+   */
+  public HFileBlock beforeBlockCache(HFileBlock block,
+      boolean includesMemstoreTS);
+
+  /**
+   * After getting block from cache.
+   * @param block block which was returned from cache, may be null.
+   * @param isCompaction Will block be used for compaction.
+   * @param includesMemstoreTS whether we have a memstore timestamp encoded
+   *    as a variable-length integer after each key-value pair
+   * @return HFileBlock to use. Can be null, even if argument is not null.
+   */
+  public HFileBlock afterBlockCache(HFileBlock block,
+      boolean isCompaction, boolean includesMemstoreTS);
+
+  /**
+   * Should special version of scanner be used.
+   * @param isCompaction Will scanner be used for compaction.
+   * @return Whether to use encoded scanner.
+   */
+  public boolean useEncodedScanner(boolean isCompaction);
+
+  /**
+   * Save metadata in StoreFile which will be written to disk
+   * @param storeFileWriter writer for a given StoreFile
+   * @exception IOException on disk problems
+   */
+  public void saveMetadata(StoreFile.Writer storeFileWriter) throws IOException;
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings.Algorithm;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+
+/**
+ * Do different kinds of data block encoding according to column family
+ * options.
+ */
+public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
+  private final DataBlockEncodings.Algorithm onDisk;
+  private final DataBlockEncodings.Algorithm inCache;
+
+  public static final boolean NO_ENCODED_SEEK = false;
+
+  private final boolean encodedSeek;
+
+  /**
+   * Do data block encoding as with specified options.
+   * @param onDisk What kind of data block encoding will be used before writing
+   *          HFileBlock to disk.
+   * @param inCache What kind of data block encoding will be used in block
+   *          cache.
+   * @param encodedSeek should we seek over encoded data blocks (true) or
+   *          decode blocks first and use normal seek operations (false)
+   */
+  public HFileDataBlockEncoderImpl(Algorithm onDisk, Algorithm inCache,
+      boolean encodedSeek) {
+    this.onDisk = onDisk != null ?
+        onDisk : DataBlockEncodings.Algorithm.NONE;
+    this.inCache = inCache != null ?
+        inCache : DataBlockEncodings.Algorithm.NONE;
+    this.encodedSeek = encodedSeek;
+  }
+
+  /**
+   * @return the data block encoding algorithm used on disk
+   */
+  public DataBlockEncodings.Algorithm getOnDisk() {
+    return onDisk;
+  }
+
+  /**
+   * @return the data block encoding algorithm used in the block cache
+   */
+  public DataBlockEncodings.Algorithm getInCache() {
+    return inCache;
+  }
+
+  /**
+   * @return whether we should do seek operations on encoded blocks
+   */
+  public boolean useEncodedSeek() {
+    return encodedSeek
+        && inCache != DataBlockEncodings.Algorithm.NONE;
+  }
+
+  // Preconditions: any HFileBlock format
+  // Postconditions: HFileBlock not encoded
+  //                 or encoded same format as inCache
+  @Override
+  public HFileBlock afterReadFromDisk(HFileBlock block) {
+    if (ignoreBlock(block)) {
+      return block; // non DATA block, skip it
+    }
+
+    // is already encoded in desired encoding
+    if (block.getBlockType() == BlockType.ENCODED_DATA &&
+        block.getDataBlockEncodingId() == inCache.getId()) {
+      return block;
+    }
+
+    // decode if we need it
+    HFileBlock decompressedBlock;
+    if (block.getBlockType() == BlockType.ENCODED_DATA) {
+      decompressedBlock = decodeDataBlock(block, false, (short) 0,
+          block.doesIncludeMemstoreTS());
+    } else {
+      decompressedBlock = block;
+    }
+
+    // check if we want to encode it here
+    if (encodedSeek && inCache != DataBlockEncodings.Algorithm.NONE &&
+          onDisk != DataBlockEncodings.Algorithm.NONE) {
+      return encodeDataBlock(decompressedBlock, inCache,
+          block.doesIncludeMemstoreTS());
+    }
+
+    return decompressedBlock;
+  }
+
+  /**
+   * Preconditions: HFileBlock not encoded or encoded in the {@link #inCache}
+   * format.
+   * <p>
+   * Postconditions:
+   * <ul>
+   * <li>if isCompaction is set and {@link #onDisk} is NONE there is no
+   * encoding</li>
+   * <li>if {@link #encodedSeek} is set there is same encoding as inCache
+   * Otherwise there is no encoding</li>
+   * </ul>
+   */
+  @Override
+  public HFileBlock afterReadFromDiskAndPuttingInCache(HFileBlock block,
+        boolean isCompaction, boolean includesMemstoreTS) {
+    if (ignoreBlock(block)) {
+      return block; // non DATA block, skip it
+    }
+
+    // use decoded buffer in case of compaction
+    if (dontEncodeBeforeCompaction(isCompaction)) {
+      if (block.getBlockType() != BlockType.DATA) {
+        return decodeDataBlock(block, true, inCache.getId(),
+            includesMemstoreTS);
+      }
+      return block;
+    }
+
+    if (!encodedSeek) {
+      // we need to have it decoded in memory
+      if (block.getBlockType() != BlockType.DATA) {
+        return decodeDataBlock(block, true, inCache.getId(),
+            includesMemstoreTS);
+      }
+      return block;
+    }
+
+    // got already data in desired format
+    if (block.getBlockType() == BlockType.ENCODED_DATA &&
+        block.getDataBlockEncodingId() == inCache.getId()) {
+      return block;
+    }
+
+    if (block.getBlockType() == BlockType.ENCODED_DATA) {
+      throw new IllegalStateException("Unexpected encoding");
+    }
+
+    // need to encode it
+    if (inCache != DataBlockEncodings.Algorithm.NONE) {
+      return encodeDataBlock(block, inCache, includesMemstoreTS);
+    }
+
+    return block;
+  }
+
+  // Precondition: not encoded buffer
+  // Postcondition: same encoding as onDisk
+  @Override
+  public Pair<ByteBuffer, BlockType> beforeWriteToDisk(ByteBuffer in,
+      boolean includesMemstoreTS) {
+    if (onDisk == DataBlockEncodings.Algorithm.NONE) {
+      // there is no need to encode the block before writing it to disk
+      return new Pair<ByteBuffer, BlockType>(in, BlockType.DATA);
+    }
+
+    ByteBuffer encodedBuffer = encodeBufferToHFileBlockBuffer(in,
+        onDisk, includesMemstoreTS);
+    return new Pair<ByteBuffer, BlockType>(encodedBuffer,
+        BlockType.ENCODED_DATA);
+  }
+
+  // Precondition: an unencoded block or the same encoding as inCache
+  // Postcondition: same encoding as inCache
+  @Override
+  public HFileBlock beforeBlockCache(HFileBlock block,
+      boolean includesMemstoreTS) {
+    if (ignoreBlock(block)) {
+      return block; // non DATA block skip it
+    }
+
+    if (block.getBlockType() == BlockType.ENCODED_DATA) {
+      if (block.getDataBlockEncodingId() == inCache.getId()) {
+        // is already encoded in right format
+        return block;
+      }
+
+      // expecting either the "in-cache" encoding or no encoding
+      throw new IllegalStateException(String.format(
+          "Expected the in-cache encoding ('%s') or no encoding, " +
+          "but got encoding '%s'", inCache.toString(),
+          DataBlockEncodings.getNameFromId(
+              block.getDataBlockEncodingId())));
+    }
+
+    if (inCache != DataBlockEncodings.Algorithm.NONE) {
+      // encode data
+      HFileBlock encodedBlock = encodeDataBlock(block, inCache,
+          includesMemstoreTS);
+      block.passSchemaMetricsTo(encodedBlock);
+      return encodedBlock;
+    }
+
+    return block;
+  }
+
+  /**
+   * Precondition: same encoding as in inCache
+   * <p>
+   * Postcondition: if (isCompaction is set and {@link #onDisk} is not NONE) or
+   *                {@link #encodedSeek} is not set -> don't encode.
+   */
+  @Override
+  public HFileBlock afterBlockCache(HFileBlock block, boolean isCompaction,
+      boolean includesMemstoreTS) {
+    if (block == null || ignoreBlock(block)) {
+      return block; // skip no DATA block
+    }
+
+    if (inCache == DataBlockEncodings.Algorithm.NONE) {
+      // no need of decoding
+      if (block.getBlockType() == BlockType.ENCODED_DATA) {
+        throw new IllegalStateException("Expected non-encoded data in cache.");
+      }
+      return block;
+    }
+
+    if (block.getBlockType() != BlockType.ENCODED_DATA) {
+      throw new IllegalStateException("Expected encoded data in cache.");
+    }
+
+    if (dontEncodeBeforeCompaction(isCompaction)) {
+      // If we don't use dataBlockEncoding on disk,
+      // we would also avoid using it for compactions.
+      // That way we don't change disk format.
+      return null;
+    }
+
+    if (encodedSeek) {
+      // we use encoding in memory
+      return block;
+    }
+
+    return decodeDataBlock(block, true, inCache.getId(), includesMemstoreTS);
+  }
+
+  @Override
+  public boolean useEncodedScanner(boolean isCompaction) {
+    if (isCompaction && onDisk == DataBlockEncodings.Algorithm.NONE) {
+      return false;
+    }
+    return encodedSeek && inCache != DataBlockEncodings.Algorithm.NONE;
+  }
+
+  @Override
+  public void saveMetadata(StoreFile.Writer storeFileWriter)
+      throws IOException {
+    storeFileWriter.appendFileInfo(StoreFile.DATA_BLOCK_ENCODING,
+        onDisk.getNameInBytes());
+  }
+
+  private HFileBlock decodeDataBlock(HFileBlock block, boolean verifyEncoding,
+      short expectedEncoderId, boolean includesMemstoreTS) {
+    assert block.getBlockType() == BlockType.ENCODED_DATA;
+    short dataBlockEncoderId = block.getDataBlockEncodingId();
+
+    // (optional) sanity check of encoder type
+    if (verifyEncoding && expectedEncoderId != dataBlockEncoderId) {
+      throw new IllegalStateException(String.format(
+          "Expected encoding type '%d', but found '%d'",
+          expectedEncoderId, dataBlockEncoderId));
+    }
+
+    ByteBuffer originalBuf = block.getBufferReadOnly();
+    ByteBuffer withoutEncodedHeader = ByteBuffer.wrap(originalBuf.array(),
+        originalBuf.arrayOffset() + HFileBlock.ENCODED_HEADER_SIZE,
+        originalBuf.limit() - HFileBlock.ENCODED_HEADER_SIZE).slice();
+    ByteBufferInputStream bbis =
+        new ByteBufferInputStream(withoutEncodedHeader);
+    DataInputStream dis;
+    ByteBuffer newBuf;
+    DataBlockEncoder dataBlockEncoder = null;
+
+    try {
+      dis = new DataInputStream(bbis);
+      dataBlockEncoder =
+          DataBlockEncodings.getDataBlockEncoderFromId(dataBlockEncoderId);
+      int preReadLength = originalBuf.limit() -
+          HFileBlock.HEADER_SIZE - block.getUncompressedSizeWithoutHeader();
+      // Sometimes buffer is larger, because it also contains next's block
+      // header. In that case we want to skip it.
+      newBuf = dataBlockEncoder.uncompressKeyValues(dis, HFileBlock.HEADER_SIZE,
+          preReadLength, includesMemstoreTS);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Bug while decoding the block using '%s'", dataBlockEncoder), e);
+    }
+
+    // Create a decoded HFileBlock. Offset will be set later.
+    return new HFileBlock(BlockType.DATA, block.getOnDiskSizeWithoutHeader(),
+        newBuf.limit() - HFileBlock.HEADER_SIZE, block.getPrevBlockOffset(),
+        newBuf, HFileBlock.FILL_HEADER, 0, includesMemstoreTS);
+  }
+
+  private ByteBuffer encodeBufferToHFileBlockBuffer(ByteBuffer in,
+      DataBlockEncodings.Algorithm algo, boolean includesMemstoreTS) {
+    ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(encodedStream);
+    DataBlockEncoder encoder = algo.getEncoder();
+    try {
+      encodedStream.write(HFileBlock.DUMMY_HEADER);
+      algo.writeIdInBytes(dataOut);
+      encoder.compressKeyValues(dataOut, in,
+          includesMemstoreTS);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format("Bug in data block encoder " +
+          "'%s', it probably requested too much data", algo.toString()), e);
+    }
+    return ByteBuffer.wrap(encodedStream.toByteArray());
+  }
+
+  private HFileBlock encodeDataBlock(HFileBlock block,
+      DataBlockEncodings.Algorithm algo, boolean includesMemstoreTS) {
+    ByteBuffer compressedBuffer = encodeBufferToHFileBlockBuffer(
+        block.getBufferWithoutHeader(), algo, includesMemstoreTS);
+    int sizeWithoutHeader = compressedBuffer.limit() - HFileBlock.HEADER_SIZE;
+    return new HFileBlock(BlockType.ENCODED_DATA,
+        block.getOnDiskSizeWithoutHeader(),
+        sizeWithoutHeader, block.getPrevBlockOffset(),
+        compressedBuffer, HFileBlock.FILL_HEADER, block.getOffset(),
+        includesMemstoreTS);
+  }
+
+  private boolean ignoreBlock(HFileBlock block) {
+    BlockType type = block.getBlockType();
+    return type != BlockType.DATA && type != BlockType.ENCODED_DATA;
+  }
+
+  private boolean dontEncodeBeforeCompaction(boolean isCompaction) {
+    return isCompaction
+        && onDisk == DataBlockEncodings.Algorithm.NONE;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(getClass().getSimpleName()
+        + " onDisk='%s' inCache='%s' encodedSeek=%s", onDisk.toString(),
+        inCache.toString(), encodedSeek);
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java Sat Dec 24 21:20:39 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.ByteBloomFilter;
@@ -170,6 +171,7 @@ public class HFilePrettyPrinter {
         conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
     conf.set("fs.default.name",
         conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
+    SchemaMetrics.configureGlobally(conf);
     try {
       if (!parseOptions(args))
         return 1;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Sat Dec 24 21:20:39 2011
@@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IOUtils;
@@ -57,12 +59,16 @@ public class HFileReaderV1 extends Abstr
    * stream.
    * @param size Length of the stream.
    * @param cacheConf cache references and configuration
+   * @param blockEncoder what kind of data block encoding will be used
+   * @throws IOException
    */
   public HFileReaderV1(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long size,
       final boolean closeIStream,
-      final CacheConfig cacheConf) {
-    super(path, trailer, fsdis, size, closeIStream, cacheConf);
+      final CacheConfig cacheConf,
+      final HFileDataBlockEncoder blockEncoder) {
+    super(path, trailer, fsdis, size, closeIStream, cacheConf,
+        blockEncoder);
 
     trailer.expectVersion(1);
     fsBlockReader = new HFileBlock.FSReaderV1(fsdis, compressAlgo, fileSize);
@@ -167,6 +173,9 @@ public class HFileReaderV1 extends Abstr
   @Override
   public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
                                 final boolean isCompaction) {
+    if (blockEncoder.useEncodedScanner(isCompaction)) {
+      return new EncodedScannerV1(this, cacheBlocks, pread, isCompaction);
+    }
     return new ScannerV1(this, cacheBlocks, pread, isCompaction);
   }
 
@@ -295,6 +304,8 @@ public class HFileReaderV1 extends Abstr
         HFileBlock cachedBlock =
           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
               cacheConf.shouldCacheDataOnRead());
+        cachedBlock = blockEncoder.afterBlockCache(cachedBlock,
+            isCompaction, MemStore.NO_PERSISTENT_TS);
         if (cachedBlock != null) {
           cacheHits.incrementAndGet();
           getSchemaMetrics().updateOnCacheHit(cachedBlock.getBlockType().getCategory(),
@@ -322,7 +333,7 @@ public class HFileReaderV1 extends Abstr
           - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
       passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.DATA);
-      ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
+      hfileBlock = blockEncoder.afterReadFromDisk(hfileBlock);
 
       long delta = System.nanoTime() - startTimeNs;
       if (pread) {
@@ -338,10 +349,17 @@ public class HFileReaderV1 extends Abstr
       // Cache the block
       if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
           hfileBlock.getBlockType().getCategory())) {
+        hfileBlock = blockEncoder.beforeBlockCache(hfileBlock,
+            MemStore.NO_PERSISTENT_TS);
         cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
             cacheConf.isInMemory());
       }
 
+      hfileBlock = blockEncoder.afterReadFromDiskAndPuttingInCache(
+          hfileBlock, isCompaction, MemStore.NO_PERSISTENT_TS);
+
+      ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
+
       return buf;
     }
   }
@@ -396,16 +414,101 @@ public class HFileReaderV1 extends Abstr
     }
   }
 
+  protected abstract static class AbstractScannerV1
+      extends AbstractHFileReader.Scanner {
+    protected final HFileReaderV1 readerV1;
+    protected int currBlock;
+
+    public AbstractScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      super(reader, cacheBlocks, pread, isCompaction);
+      readerV1 = reader;
+    }
+
+    /**
+     * Within a loaded block, seek looking for the first key
+     * that is smaller than (or equal to?) the key we are interested in.
+     *
+     * A note on the seekBefore - if you have seekBefore = true, AND the
+     * first key in the block = key, then you'll get thrown exceptions.
+     * @param key to find
+     * @param seekBefore find the key before the exact match.
+     * @return
+     */
+    protected abstract int blockSeek(byte[] key, int offset, int length,
+        boolean seekBefore);
+
+    protected abstract void loadBlock(int bloc, boolean rewind)
+        throws IOException;
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      int b = readerV1.blockContainingKey(key, offset, length);
+      if (b < 0) return -1; // falls before the beginning of the file! :-(
+      // Avoid re-reading the same block (that'd be dumb).
+      loadBlock(b, true);
+      return blockSeek(key, offset, length, false);
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length)
+        throws IOException {
+      if (blockBuffer != null && currKeyLen != 0) {
+        ByteBuffer bb = getKey();
+        int compared = reader.getComparator().compare(key, offset,
+            length, bb.array(), bb.arrayOffset(), bb.limit());
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        }
+      }
+
+      int b = readerV1.blockContainingKey(key, offset, length);
+      if (b < 0) {
+        return -1;
+      }
+      loadBlock(b, false);
+      return blockSeek(key, offset, length, false);
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length)
+    throws IOException {
+      int b = readerV1.blockContainingKey(key, offset, length);
+      if (b < 0)
+        return false; // key is before the start of the file.
+
+      // Question: does this block begin with 'key'?
+      byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
+      if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
+          key, offset, length) == 0) {
+        // Ok the key we're interested in is the first of the block, so go back
+        // by one.
+        if (b == 0) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+        b--;
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      loadBlock(b, true);
+      blockSeek(key, offset, length, true);
+      return true;
+    }
+  }
+
   /**
    * Implementation of {@link HFileScanner} interface.
    */
-  protected static class ScannerV1 extends AbstractHFileReader.Scanner {
-    private final HFileReaderV1 reader;
-    private int currBlock;
+
+  protected static class ScannerV1 extends AbstractScannerV1 {
+    private HFileReaderV1 reader;
 
     public ScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
-      super(cacheBlocks, pread, isCompaction);
+      super(reader, cacheBlocks, pread, isCompaction);
       this.reader = reader;
     }
 
@@ -486,57 +589,7 @@ public class HFileReaderV1 extends Abstr
     }
 
     @Override
-    public int seekTo(byte[] key) throws IOException {
-      return seekTo(key, 0, key.length);
-    }
-
-    @Override
-    public int seekTo(byte[] key, int offset, int length) throws IOException {
-      int b = reader.blockContainingKey(key, offset, length);
-      if (b < 0) return -1; // falls before the beginning of the file! :-(
-      // Avoid re-reading the same block (that'd be dumb).
-      loadBlock(b, true);
-      return blockSeek(key, offset, length, false);
-    }
-
-    @Override
-    public int reseekTo(byte[] key) throws IOException {
-      return reseekTo(key, 0, key.length);
-    }
-
-    @Override
-    public int reseekTo(byte[] key, int offset, int length)
-        throws IOException {
-      if (blockBuffer != null && currKeyLen != 0) {
-        ByteBuffer bb = getKey();
-        int compared = reader.getComparator().compare(key, offset,
-            length, bb.array(), bb.arrayOffset(), bb.limit());
-        if (compared <= 0) {
-          // If the required key is less than or equal to current key, then
-          // don't do anything.
-          return compared;
-        }
-      }
-
-      int b = reader.blockContainingKey(key, offset, length);
-      if (b < 0) {
-        return -1;
-      }
-      loadBlock(b, false);
-      return blockSeek(key, offset, length, false);
-    }
-
-    /**
-     * Within a loaded block, seek looking for the first key
-     * that is smaller than (or equal to?) the key we are interested in.
-     *
-     * A note on the seekBefore - if you have seekBefore = true, AND the
-     * first key in the block = key, then you'll get thrown exceptions.
-     * @param key to find
-     * @param seekBefore find the key before the exact match.
-     * @return
-     */
-    private int blockSeek(byte[] key, int offset, int length,
+    protected int blockSeek(byte[] key, int offset, int length,
         boolean seekBefore) {
       int klen, vlen;
       int lastLen = 0;
@@ -578,37 +631,6 @@ public class HFileReaderV1 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key) throws IOException {
-      return seekBefore(key, 0, key.length);
-    }
-
-    @Override
-    public boolean seekBefore(byte[] key, int offset, int length)
-    throws IOException {
-      int b = reader.blockContainingKey(key, offset, length);
-      if (b < 0)
-        return false; // key is before the start of the file.
-
-      // Question: does this block begin with 'key'?
-      byte[] firstkKey = reader.getDataBlockIndexReader().getRootBlockKey(b);
-      if (reader.getComparator().compare(firstkKey, 0, firstkKey.length,
-          key, offset, length) == 0) {
-        // Ok the key we're interested in is the first of the block, so go back
-        // by one.
-        if (b == 0) {
-          // we have a 'problem', the key we want is the first of the file.
-          return false;
-        }
-        b--;
-        // TODO shortcut: seek forward in this block to the last key of the
-        // block.
-      }
-      loadBlock(b, true);
-      blockSeek(key, offset, length, true);
-      return true;
-    }
-
-    @Override
     public String getKeyString() {
       return Bytes.toStringBinary(blockBuffer.array(),
           blockBuffer.arrayOffset() + blockBuffer.position(), currKeyLen);
@@ -621,11 +643,6 @@ public class HFileReaderV1 extends Abstr
     }
 
     @Override
-    public Reader getReader() {
-      return reader;
-    }
-
-    @Override
     public boolean seekTo() throws IOException {
       if (reader.getDataBlockIndexReader().isEmpty()) {
         return false;
@@ -645,7 +662,8 @@ public class HFileReaderV1 extends Abstr
       return true;
     }
 
-    private void loadBlock(int bloc, boolean rewind) throws IOException {
+    @Override
+    protected void loadBlock(int bloc, boolean rewind) throws IOException {
       if (blockBuffer == null) {
         blockBuffer = reader.readBlockBuffer(bloc, cacheBlocks, pread,
             isCompaction);
@@ -672,6 +690,115 @@ public class HFileReaderV1 extends Abstr
 
   }
 
+  protected static class EncodedScannerV1 extends AbstractScannerV1 {
+    private DataBlockEncoder.EncodedSeeker seeker = null;
+    private DataBlockEncoder dataBlockEncoder = null;
+
+    public EncodedScannerV1(HFileReaderV1 reader, boolean cacheBlocks,
+        boolean pread, boolean isCompaction) {
+      super(reader, cacheBlocks, pread, isCompaction);
+    }
+
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader.getDataBlockIndexReader().isEmpty()) {
+        return false;
+      }
+
+      loadBlock(0, true);
+      return true;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (blockBuffer == null) {
+        throw new IOException("Next called on non-seeked scanner");
+      }
+
+      boolean ok = seeker.next();
+
+      if (!ok) {
+        if (currBlock + 1 >=
+            reader.getDataBlockIndexReader().getRootBlockCount()) {
+          // damn we are at the end
+          currBlock = 0;
+          blockBuffer = null;
+          return false;
+        }
+        loadBlock(currBlock + 1, false);
+        ok = true;
+      }
+
+      return ok;
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      return seeker.getKey();
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      return seeker.getValue();
+    }
+
+    @Override
+    public KeyValue getKeyValue() {
+      if (blockBuffer == null) {
+        return null;
+      }
+      return seeker.getKeyValueObject();
+    }
+
+    @Override
+    public String getKeyString() {
+      ByteBuffer keyBuffer = seeker.getKey();
+      return Bytes.toStringBinary(keyBuffer.array(),
+          keyBuffer.arrayOffset(), keyBuffer.limit());
+    }
+
+    @Override
+    public String getValueString() {
+      ByteBuffer valueBuffer = seeker.getValue();
+      return Bytes.toStringBinary(valueBuffer.array(),
+          valueBuffer.arrayOffset(), valueBuffer.limit());
+    }
+
+    @Override
+    protected int blockSeek(byte[] key, int offset, int length,
+        boolean seekBefore) {
+      return seeker.blockSeekTo(key, offset, length, seekBefore);
+    }
+
+    @Override
+    protected void loadBlock(int bloc, boolean rewind) throws IOException {
+      if (blockBuffer == null || bloc != currBlock) {
+        blockBuffer = readerV1.readBlockBuffer(bloc, cacheBlocks, pread,
+            isCompaction);
+        currBlock = bloc;
+        blockFetches++;
+        short dataBlockEncoderId = blockBuffer.getShort();
+        blockBuffer = blockBuffer.slice();
+
+        if (seeker == null ||
+            DataBlockEncodings.isCorrectEncoder(
+                dataBlockEncoder, dataBlockEncoderId)) {
+          dataBlockEncoder =
+              DataBlockEncodings.getDataBlockEncoderFromId(dataBlockEncoderId);
+          seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
+              MemStore.NO_PERSISTENT_TS);
+        }
+        seeker.setCurrentBuffer(blockBuffer);
+
+      } else {
+        // we are already in the same block, just rewind to seek again.
+        if (rewind) {
+          seeker.rewind();
+        }
+      }
+    }
+  }
+
   @Override
   public HFileBlock readBlock(long offset, long onDiskBlockSize,
       boolean cacheBlock, boolean pread, boolean isCompaction) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Sat Dec 24 21:20:39 2011
@@ -30,6 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -80,17 +82,20 @@ public class HFileReaderV2 extends Abstr
    * @param size Length of the stream.
    * @param closeIStream Whether to close the stream.
    * @param cacheConf Cache configuration.
+   * @param blockEncoder what kind of data block encoding will be used
    * @throws IOException
    */
   public HFileReaderV2(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long size,
-      final boolean closeIStream, final CacheConfig cacheConf)
+      final boolean closeIStream, final CacheConfig cacheConf,
+      final HFileDataBlockEncoder blockEncoder)
   throws IOException {
-    super(path, trailer, fsdis, size, closeIStream, cacheConf);
-
+    super(path, trailer, fsdis, size, closeIStream, cacheConf,
+        blockEncoder);
     trailer.expectVersion(2);
-    fsBlockReader = new HFileBlock.FSReaderV2(fsdis, compressAlgo,
-        fileSize);
+    HFileBlock.FSReaderV2 fsBlockReader = new HFileBlock.FSReaderV2(fsdis,
+        compressAlgo, fileSize, blockEncoder);
+    this.fsBlockReader = fsBlockReader;
 
     // Comparator class name is stored in the trailer in version 2.
     comparator = trailer.createComparator();
@@ -123,8 +128,10 @@ public class HFileReaderV2 extends Abstr
     avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
     avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
     byte [] keyValueFormatVersion = fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
-    includesMemstoreTS = (keyValueFormatVersion != null &&
-        Bytes.toInt(keyValueFormatVersion) == HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE);
+    includesMemstoreTS = keyValueFormatVersion != null &&
+        Bytes.toInt(keyValueFormatVersion) ==
+            HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
+    fsBlockReader.setIncludesMemstoreTS(includesMemstoreTS);
 
     // Store all other load-on-open blocks for further consumption.
     HFileBlock b;
@@ -145,9 +152,15 @@ public class HFileReaderV2 extends Abstr
    * @param isCompaction is scanner being used for a compaction?
    * @return Scanner on this file.
    */
-  @Override
-  public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
+   @Override
+   public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
       final boolean isCompaction) {
+    // check if we want to use data block encoding in memory
+    if (blockEncoder.useEncodedScanner(isCompaction)) {
+      return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
+          includesMemstoreTS);
+    }
+
     return new ScannerV2(this, cacheBlocks, pread, isCompaction);
   }
 
@@ -258,6 +271,8 @@ public class HFileReaderV2 extends Abstr
       if (cacheConf.isBlockCacheEnabled()) {
         HFileBlock cachedBlock =
           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock);
+        cachedBlock = blockEncoder.afterBlockCache(cachedBlock,
+            isCompaction, shouldIncludeMemstoreTS());
         if (cachedBlock != null) {
           BlockCategory blockCategory =
               cachedBlock.getBlockType().getCategory();
@@ -265,8 +280,9 @@ public class HFileReaderV2 extends Abstr
 
           getSchemaMetrics().updateOnCacheHit(blockCategory, isCompaction);
 
-          if (cachedBlock.getBlockType() == BlockType.DATA)
+          if (cachedBlock.getBlockType() == BlockType.DATA) {
             HFile.dataBlockReadCnt.incrementAndGet();
+          }
           return cachedBlock;
         }
         // Carry on, please load.
@@ -292,6 +308,8 @@ public class HFileReaderV2 extends Abstr
       // Cache the block
       if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
               hfileBlock.getBlockType().getCategory())) {
+        hfileBlock = blockEncoder.beforeBlockCache(
+            hfileBlock, includesMemstoreTS);
         cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
             cacheConf.isInMemory());
       }
@@ -300,7 +318,8 @@ public class HFileReaderV2 extends Abstr
         HFile.dataBlockReadCnt.incrementAndGet();
       }
 
-      return hfileBlock;
+      return blockEncoder.afterReadFromDiskAndPuttingInCache(hfileBlock,
+          isCompaction, includesMemstoreTS);
     } finally {
       offsetLock.releaseLockEntry(lockEntry);
     }
@@ -345,25 +364,154 @@ public class HFileReaderV2 extends Abstr
     }
   }
 
+  protected abstract static class AbstractScannerV2
+      extends AbstractHFileReader.Scanner {
+    protected HFileBlock block;
+
+    public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
+        final boolean pread, final boolean isCompaction) {
+      super(r, cacheBlocks, pread, isCompaction);
+    }
+
+    /**
+     * An internal API function. Seek to the given key, optionally rewinding to
+     * the first key of the block before doing the seek.
+     *
+     * @param key key byte array
+     * @param offset key offset in the key byte array
+     * @param length key length
+     * @param rewind whether to rewind to the first key of the block before
+     *        doing the seek. If this is false, we are assuming we never go
+     *        back, otherwise the result is undefined.
+     * @return -1 if the key is earlier than the first key of the file,
+     *         0 if we are at the given key, and 1 if we are past the given key
+     * @throws IOException
+     */
+    protected int seekTo(byte[] key, int offset, int length, boolean rewind)
+        throws IOException {
+      HFileBlockIndex.BlockIndexReader indexReader =
+          reader.getDataBlockIndexReader();
+      HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
+          block, cacheBlocks, pread, isCompaction);
+      if (seekToBlock == null) {
+        // This happens if the key e.g. falls before the beginning of the file.
+        return -1;
+      }
+      return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
+          false);
+    }
+
+    protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
+
+    protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock,
+        boolean rewind, byte[] key, int offset, int length, boolean seekBefore)
+        throws IOException;
+
+    @Override
+    public int seekTo(byte[] key, int offset, int length) throws IOException {
+      // Always rewind to the first key of the block, because the given key
+      // might be before or after the current key.
+      return seekTo(key, offset, length, true);
+    }
+
+    @Override
+    public int reseekTo(byte[] key, int offset, int length) throws IOException {
+      if (isSeeked()) {
+        ByteBuffer bb = getKey();
+        int compared = reader.getComparator().compare(key, offset,
+            length, bb.array(), bb.arrayOffset(), bb.limit());
+        if (compared < 1) {
+          // If the required key is less than or equal to current key, then
+          // don't do anything.
+          return compared;
+        }
+      }
+
+      // Don't rewind on a reseek operation, because reseek implies that we are
+      // always going forward in the file.
+      return seekTo(key, offset, length, false);
+    }
+
+    @Override
+    public boolean seekBefore(byte[] key, int offset, int length)
+        throws IOException {
+      HFileBlock seekToBlock =
+          reader.getDataBlockIndexReader().seekToDataBlock(key, offset, length,
+              block, cacheBlocks, pread, isCompaction);
+      if (seekToBlock == null) {
+        return false;
+      }
+      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
+      if (reader.getComparator().compare(firstKey.array(),
+          firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
+      {
+        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
+        // The key we are interested in
+        if (previousBlockOffset == -1) {
+          // we have a 'problem', the key we want is the first of the file.
+          return false;
+        }
+
+        // It is important that we compute and pass onDiskSize to the block
+        // reader so that it does not have to read the header separately to
+        // figure out the size.
+        seekToBlock = reader.readBlock(previousBlockOffset,
+            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
+            pread, isCompaction);
+
+        // TODO shortcut: seek forward in this block to the last key of the
+        // block.
+      }
+      loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+      return true;
+    }
+
+
+    /**
+     * Scans blocks in the "scanned" section of the {@link HFile} until the next
+     * data block is found.
+     *
+     * @return the next block, or null if there are no more data blocks
+     * @throws IOException
+     */
+    protected HFileBlock readNextDataBlock() throws IOException {
+      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
+      if (block == null)
+        return null;
+
+      HFileBlock curBlock = block;
+
+      do {
+        if (curBlock.getOffset() >= lastDataBlockOffset)
+          return null;
+
+        if (curBlock.getOffset() < 0) {
+          throw new IOException("Invalid block file offset: " + block);
+        }
+        curBlock = reader.readBlock(curBlock.getOffset()
+            + curBlock.getOnDiskSizeWithHeader(),
+            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
+            isCompaction);
+      } while (!(curBlock.getBlockType().equals(BlockType.DATA) ||
+          curBlock.getBlockType().equals(BlockType.ENCODED_DATA)));
+
+      return curBlock;
+    }
+  }
+
   /**
    * Implementation of {@link HFileScanner} interface.
    */
-  protected static class ScannerV2 extends AbstractHFileReader.Scanner {
-    private HFileBlock block;
+  protected static class ScannerV2 extends AbstractScannerV2 {
     private HFileReaderV2 reader;
 
     public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
-      super(cacheBlocks, pread, isCompaction);
+      super(r, cacheBlocks, pread, isCompaction);
       this.reader = r;
     }
 
     @Override
-    public HFileReaderV2 getReader() {
-      return reader;
-    }
-
-    @Override
     public KeyValue getKeyValue() {
       if (!isSeeked())
         return null;
@@ -452,36 +600,6 @@ public class HFileReaderV2 extends Abstr
     }
 
     /**
-     * Scans blocks in the "scanned" section of the {@link HFile} until the next
-     * data block is found.
-     *
-     * @return the next block, or null if there are no more data blocks
-     * @throws IOException
-     */
-    private HFileBlock readNextDataBlock() throws IOException {
-      long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
-      if (block == null)
-        return null;
-
-      HFileBlock curBlock = block;
-
-      do {
-        if (curBlock.getOffset() >= lastDataBlockOffset)
-          return null;
-
-        if (curBlock.getOffset() < 0) {
-          throw new IOException("Invalid block file offset: " + block);
-        }
-        curBlock = reader.readBlock(curBlock.getOffset()
-            + curBlock.getOnDiskSizeWithHeader(),
-            curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
-            isCompaction);
-      } while (!curBlock.getBlockType().equals(BlockType.DATA));
-
-      return curBlock;
-    }
-
-    /**
      * Positions this scanner at the start of the file.
      *
      * @return false if empty file; i.e. a call to next would return false and
@@ -517,70 +635,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public int seekTo(byte[] key) throws IOException {
-      return seekTo(key, 0, key.length);
-    }
-
-    /**
-     * An internal API function. Seek to the given key, optionally rewinding to
-     * the first key of the block before doing the seek.
-     *
-     * @param key key byte array
-     * @param offset key offset in the key byte array
-     * @param length key length
-     * @param rewind whether to rewind to the first key of the block before
-     *        doing the seek. If this is false, we are assuming we never go
-     *        back, otherwise the result is undefined.
-     * @return -1 if the key is earlier than the first key of the file,
-     *         0 if we are at the given key, and 1 if we are past the given key
-     * @throws IOException
-     */
-    private int seekTo(byte[] key, int offset, int length, boolean rewind)
-        throws IOException {
-      HFileBlockIndex.BlockIndexReader indexReader =
-          reader.getDataBlockIndexReader();
-      HFileBlock seekToBlock = indexReader.seekToDataBlock(key, offset, length,
-          block, cacheBlocks, pread, isCompaction);
-
-      if (seekToBlock == null) {
-        // This happens if the key e.g. falls before the beginning of the file.
-        return -1;
-      }
-      return loadBlockAndSeekToKey(seekToBlock, rewind, key, offset, length,
-          false);
-    }
-
-    @Override
-    public int seekTo(byte[] key, int offset, int length) throws IOException {
-      // Always rewind to the first key of the block, because the given key
-      // might be before or after the current key.
-      return seekTo(key, offset, length, true);
-    }
-
-    @Override
-    public int reseekTo(byte[] key) throws IOException {
-      return reseekTo(key, 0, key.length);
-    }
-
-    @Override
-    public int reseekTo(byte[] key, int offset, int length) throws IOException {
-      if (isSeeked()) {
-        ByteBuffer bb = getKey();
-        int compared = reader.getComparator().compare(key, offset,
-            length, bb.array(), bb.arrayOffset(), bb.limit());
-        if (compared < 1) {
-          // If the required key is less than or equal to current key, then
-          // don't do anything.
-          return compared;
-        }
-      }
-
-      // Don't rewind on a reseek operation, because reseek implies that we are
-      // always going forward in the file.
-      return seekTo(key, offset, length, false);
-    }
-
-    private int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
         byte[] key, int offset, int length, boolean seekBefore)
         throws IOException {
       if (block == null || block.getOffset() != seekToBlock.getOffset()) {
@@ -599,6 +654,13 @@ public class HFileReaderV2 extends Abstr
      */
     private void updateCurrBlock(HFileBlock newBlock) {
       block = newBlock;
+
+      // sanity check
+      if (block.getBlockType() != BlockType.DATA) {
+        throw new IllegalStateException(
+            "ScannerV2 works only on data blocks");
+      }
+
       blockBuffer = block.getBufferWithoutHeader();
       readKeyValueLen();
       blockFetches++;
@@ -713,11 +775,7 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key) throws IOException {
-      return seekBefore(key, 0, key.length);
-    }
-
-    private ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
       ByteBuffer buffer = curBlock.getBufferWithoutHeader();
       // It is safe to manipulate this buffer because we own the buffer object.
       buffer.rewind();
@@ -730,53 +788,174 @@ public class HFileReaderV2 extends Abstr
     }
 
     @Override
-    public boolean seekBefore(byte[] key, int offset, int length)
-        throws IOException {
-      HFileBlock seekToBlock =
-          reader.getDataBlockIndexReader().seekToDataBlock(key, offset,
-              length, block, cacheBlocks, pread, isCompaction);
-      if (seekToBlock == null) {
+    public String getKeyString() {
+      return Bytes.toStringBinary(blockBuffer.array(),
+          blockBuffer.arrayOffset() + blockBuffer.position()
+              + KEY_VALUE_LEN_SIZE, currKeyLen);
+    }
+
+    @Override
+    public String getValueString() {
+      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
+          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
+          currValueLen);
+    }
+  }
+
+  /**
+   * ScannerV2 which operate on encoded data blocks.
+   */
+  protected static class EncodedScannerV2 extends AbstractScannerV2 {
+    private DataBlockEncoder.EncodedSeeker seeker = null;
+    private DataBlockEncoder dataBlockEncoder = null;
+    private final boolean includesMemstoreTS;
+
+    public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
+        boolean pread, boolean isCompaction, boolean includesMemstoreTS) {
+      super(reader, cacheBlocks, pread, isCompaction);
+      this.includesMemstoreTS = includesMemstoreTS;
+    }
+
+    private void setDataBlockEncoder(DataBlockEncoder dataBlockEncoder) {
+      this.dataBlockEncoder = dataBlockEncoder;
+      seeker = dataBlockEncoder.createSeeker(reader.getComparator(),
+          includesMemstoreTS);
+    }
+
+    /**
+     * Updates the current block to be the given {@link HFileBlock}. Seeks to
+     * the the first key/value pair.
+     *
+     * @param newBlock the block to make current
+     */
+    private void updateCurrentBlock(HFileBlock newBlock) {
+      block = newBlock;
+
+      // sanity checks
+      if (block.getBlockType() != BlockType.ENCODED_DATA) {
+        throw new IllegalStateException(
+            "EncodedScannerV2 works only on encoded data blocks");
+      }
+
+      short dataBlockEncoderId = block.getDataBlockEncodingId();
+      if (dataBlockEncoder == null
+          || !DataBlockEncodings.isCorrectEncoder(dataBlockEncoder,
+              dataBlockEncoderId)) {
+        DataBlockEncoder encoder =
+            DataBlockEncodings.getDataBlockEncoderFromId(dataBlockEncoderId);
+        setDataBlockEncoder(encoder);
+      }
+
+      seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
+      blockFetches++;
+    }
+
+    private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
+      ByteBuffer origBlock = newBlock.getBufferReadOnly();
+      ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
+          origBlock.arrayOffset() + HFileBlock.HEADER_SIZE +
+          DataBlockEncodings.ID_SIZE,
+          origBlock.limit() - HFileBlock.HEADER_SIZE -
+          DataBlockEncodings.ID_SIZE).slice();
+      return encodedBlock;
+    }
+
+    @Override
+    public boolean seekTo() throws IOException {
+      if (reader == null) {
         return false;
       }
-      ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
-      if (reader.getComparator().compare(firstKey.array(),
-          firstKey.arrayOffset(), firstKey.limit(), key, offset, length) == 0)
-      {
-        long previousBlockOffset = seekToBlock.getPrevBlockOffset();
-        // The key we are interested in
-        if (previousBlockOffset == -1) {
-          // we have a 'problem', the key we want is the first of the file.
-          return false;
-        }
 
-        // It is important that we compute and pass onDiskSize to the block
-        // reader so that it does not have to read the header separately to
-        // figure out the size.
-        seekToBlock = reader.readBlock(previousBlockOffset,
-            seekToBlock.getOffset() - previousBlockOffset, cacheBlocks,
-            pread, isCompaction);
+      if (reader.getTrailer().getEntryCount() == 0) {
+        // No data blocks.
+        return false;
+      }
 
-        // TODO shortcut: seek forward in this block to the last key of the
-        // block.
+      long firstDataBlockOffset =
+          reader.getTrailer().getFirstDataBlockOffset();
+      if (block != null && block.getOffset() == firstDataBlockOffset) {
+        seeker.rewind();
+        return true;
       }
-      loadBlockAndSeekToKey(seekToBlock, true, key, offset, length, true);
+
+      block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
+          isCompaction);
+      if (block.getOffset() < 0) {
+        throw new IOException("Invalid block offset: " + block.getOffset());
+      }
+      updateCurrentBlock(block);
       return true;
     }
 
     @Override
+    public boolean next() throws IOException {
+      boolean isValid = seeker.next();
+      if (!isValid) {
+        block = readNextDataBlock();
+        isValid = block != null;
+        if (isValid) {
+          updateCurrentBlock(block);
+        }
+      }
+      return isValid;
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      assertValidSeek();
+      return seeker.getKey();
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      assertValidSeek();
+      return seeker.getValue();
+    }
+
+    @Override
+    public KeyValue getKeyValue() {
+      if (block == null) {
+        return null;
+      }
+      return seeker.getKeyValueObject();
+    }
+
+    @Override
     public String getKeyString() {
-      return Bytes.toStringBinary(blockBuffer.array(),
-          blockBuffer.arrayOffset() + blockBuffer.position()
-              + KEY_VALUE_LEN_SIZE, currKeyLen);
+      ByteBuffer keyBuffer = getKey();
+      return Bytes.toStringBinary(keyBuffer.array(),
+          keyBuffer.arrayOffset(), keyBuffer.limit());
     }
 
     @Override
     public String getValueString() {
-      return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
-          + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
-          currValueLen);
+      ByteBuffer valueBuffer = getValue();
+      return Bytes.toStringBinary(valueBuffer.array(),
+          valueBuffer.arrayOffset(), valueBuffer.limit());
+    }
+
+    private void assertValidSeek() {
+      if (block == null) {
+        throw new NotSeekedException();
+      }
     }
 
+    @Override
+    protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
+      return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
+    }
+
+    @Override
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, boolean rewind,
+        byte[] key, int offset, int length, boolean seekBefore)
+        throws IOException  {
+      if (block == null || block.getOffset() != seekToBlock.getOffset()) {
+        updateCurrentBlock(seekToBlock);
+      } else if (rewind) {
+        seeker.rewind();
+      }
+      return seeker.blockSeekTo(key, offset, length, seekBefore);
+    }
   }
 
   /**