You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/12/24 22:20:41 UTC

svn commit: r1223020 [1/5] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...

Author: tedyu
Date: Sat Dec 24 21:20:39 2011
New Revision: 1223020

URL: http://svn.apache.org/viewvc?rev=1223020&view=rev
Log:
HBASE-4218 Data Block Encoding of KeyValues (aka delta encoding / prefix compression) (Jacek, Mikhail)

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BitsetKeyDeltaEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncodings.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/DataBlockEncodingTool.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/EncodedSeekPerformanceTest.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
    hbase/trunk/src/main/ruby/hbase/admin.rb
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HFilePerformanceEvaluation.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompoundBloomFilter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Sat Dec 24 21:20:39 2011
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -53,10 +54,18 @@ public class HColumnDescriptor implement
   // Version 6 adds metadata as a map where keys and values are byte[].
   // Version 7 -- add new compression and hfile blocksize to HColumnDescriptor (HBASE-1217)
   // Version 8 -- reintroduction of bloom filters, changed from boolean to enum
-  private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)8;
+  // Version 9 -- add data block encoding
+  private static final byte COLUMN_DESCRIPTOR_VERSION = (byte) 9;
 
+  // These constants are used as FileInfo keys
   public static final String COMPRESSION = "COMPRESSION";
   public static final String COMPRESSION_COMPACT = "COMPRESSION_COMPACT";
+  public static final String DATA_BLOCK_ENCODING_ON_DISK =
+      "DATA_BLOCK_ENCODING_ON_DISK";
+  public static final String DATA_BLOCK_ENCODING_IN_CACHE =
+      "DATA_BLOCK_ENCODING_IN_CACHE";
+  public static final String ENCODED_DATA_BLOCK_SEEK =
+    "ENCODED_DATA_BLOCK_SEEK";
   public static final String BLOCKCACHE = "BLOCKCACHE";
   
   /**
@@ -79,6 +88,23 @@ public class HColumnDescriptor implement
    */
   public static final String DEFAULT_COMPRESSION =
     Compression.Algorithm.NONE.getName();
+  
+  /**
+   * Default data block encoding algorithm on disk.
+   */
+  public static final String DEFAULT_DATA_BLOCK_ENCODING_ON_DISK =
+      DataBlockEncodings.Algorithm.NONE.toString();
+
+  /**
+   * Default data block encoding algorithm in cache.
+   */
+  public static final String DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE =
+      DataBlockEncodings.Algorithm.NONE.toString();
+
+  /**
+   * Do not use encoded seek by default.
+   */
+  public static final boolean DEFAULT_ENCODED_DATA_BLOCK_SEEK = false;
 
   /**
    * Default number of versions of a record to keep.
@@ -143,6 +169,12 @@ public class HColumnDescriptor implement
       DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY));
       DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
       DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED));
+      DEFAULT_VALUES.put(DATA_BLOCK_ENCODING_ON_DISK,
+          String.valueOf(DEFAULT_DATA_BLOCK_ENCODING_ON_DISK));
+      DEFAULT_VALUES.put(DATA_BLOCK_ENCODING_IN_CACHE,
+          String.valueOf(DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE));
+      DEFAULT_VALUES.put(ENCODED_DATA_BLOCK_SEEK,
+          String.valueOf(DEFAULT_ENCODED_DATA_BLOCK_SEEK));
   }
 
   // Column family name
@@ -258,8 +290,9 @@ public class HColumnDescriptor implement
       final boolean blockCacheEnabled, final int blocksize,
       final int timeToLive, final String bloomFilter, final int scope) {
     this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, DEFAULT_KEEP_DELETED,
-        compression, inMemory, blockCacheEnabled, blocksize, timeToLive,
-        bloomFilter, scope);
+        compression, DEFAULT_DATA_BLOCK_ENCODING_ON_DISK,
+        DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE, DEFAULT_ENCODED_DATA_BLOCK_SEEK,
+        inMemory, blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope);
   }
 
   /**
@@ -271,6 +304,9 @@ public class HColumnDescriptor implement
    * @param keepDeletedCells Whether to retain deleted cells until they expire
    *        up to maxVersions versions.
    * @param compression Compression type
+   * @param dataBlockEncodingOnDisk data block encoding used on disk
+   * @param dataBlockEncodingInCache data block encoding used in block cache
+   * @param encodedDataBlockSeek whether to use data block encoding while seeking
    * @param inMemory If true, column data should be kept in an HRegionServer's
    * cache
    * @param blockCacheEnabled If true, MapFile blocks should be cached
@@ -287,9 +323,12 @@ public class HColumnDescriptor implement
    * a <code>:</code>
    * @throws IllegalArgumentException if the number of versions is &lt;= 0
    */
-  public HColumnDescriptor(final byte[] familyName, final int minVersions,
+  public HColumnDescriptor(final byte [] familyName, final int minVersions,
       final int maxVersions, final boolean keepDeletedCells,
-      final String compression, final boolean inMemory,
+      final String compression,
+      final String dataBlockEncodingOnDisk,
+      final String dataBlockEncodingInCache,
+      final boolean encodedDataBlockSeek, final boolean inMemory,
       final boolean blockCacheEnabled, final int blocksize,
       final int timeToLive, final String bloomFilter, final int scope) {
     isLegalFamilyName(familyName);
@@ -319,6 +358,11 @@ public class HColumnDescriptor implement
     setTimeToLive(timeToLive);
     setCompressionType(Compression.Algorithm.
       valueOf(compression.toUpperCase()));
+    setDataBlockEncodingOnDisk(DataBlockEncodings.Algorithm.
+        valueOf(dataBlockEncodingOnDisk.toUpperCase()));
+    setDataBlockEncodingInCache(DataBlockEncodings.Algorithm.
+        valueOf(dataBlockEncodingInCache.toUpperCase()));
+    setEncodedDataBlockSeek(encodedDataBlockSeek);
     setBloomFilterType(StoreFile.BloomType.
       valueOf(bloomFilter.toUpperCase()));
     setBlocksize(blocksize);
@@ -496,6 +540,71 @@ public class HColumnDescriptor implement
     setValue(COMPRESSION, compressionType);
   }
 
+  /** @return data block encoding algorithm used on disk */
+  public DataBlockEncodings.Algorithm getDataBlockEncodingOnDisk() {
+    String type = getValue(DATA_BLOCK_ENCODING_ON_DISK);
+    if (type == null) {
+      type = DEFAULT_DATA_BLOCK_ENCODING_ON_DISK;
+    }
+    return DataBlockEncodings.Algorithm.valueOf(type);
+  }
+
+  /**
+   * Set data block encoding algorithm.
+   * @param type What kind of data block encoding will be used.
+   */
+  public void setDataBlockEncodingOnDisk(
+      DataBlockEncodings.Algorithm type) {
+    String name;
+    if (type != null) {
+      name = type.toString();
+    } else {
+      name = DataBlockEncodings.Algorithm.NONE.toString();
+    }
+    setValue(DATA_BLOCK_ENCODING_ON_DISK, name);
+  }
+  
+  /** @return data block encoding in block cache algorithm */
+  public DataBlockEncodings.Algorithm getDataBlockEncodingInCache() {
+    String type = getValue(DATA_BLOCK_ENCODING_IN_CACHE);
+    if (type == null) {
+      type = DEFAULT_DATA_BLOCK_ENCODING_IN_CACHE;
+    }
+    return DataBlockEncodings.Algorithm.valueOf(type);
+  }
+
+  /**
+   * Set data block encoding algorithm used in block cache.
+   * @param type What kind of data block encoding will be used.
+   */
+  public void setDataBlockEncodingInCache(
+      DataBlockEncodings.Algorithm type) {
+    String name;
+    if (type != null) {
+      name = type.toString();
+    } else {
+      name = DataBlockEncodings.Algorithm.NONE.toString();
+    }
+    setValue(DATA_BLOCK_ENCODING_IN_CACHE, name);
+  }
+
+  /** @return whether we are doing seek operations over encoded data blocks */
+  public boolean useEncodedDataBlockSeek() {
+    String value = getValue(ENCODED_DATA_BLOCK_SEEK);
+    if (value != null) {
+      return Boolean.valueOf(value);
+    }
+    return DEFAULT_ENCODED_DATA_BLOCK_SEEK;
+  }
+
+  /**
+   * Set whether we should seek over encoded data blocks (true) or decode
+   * blocks first and use normal seek operations (false).
+   */
+  public void setEncodedDataBlockSeek(boolean useEncodedSeek) {
+    setValue(ENCODED_DATA_BLOCK_SEEK, Boolean.toString(useEncodedSeek));
+  }
+
   /**
    * @return Compression type setting.
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java Sat Dec 24 21:20:39 2011
@@ -130,16 +130,27 @@ public class KeyValue implements Writabl
     return COMPARATOR.getRawComparator();
   }
 
+  /** Size of the key length field in bytes*/
+  public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
+
+  /** Size of the key type field in bytes */
+  public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
+
+  /** Size of the row length field in bytes */
+  public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
+
+  /** Size of the family length field in bytes */
+  public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
+
+  /** Size of the timestamp field in bytes */
+  public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
+
   // Size of the timestamp and type byte on end of a key -- a long + a byte.
-  public static final int TIMESTAMP_TYPE_SIZE =
-    Bytes.SIZEOF_LONG /* timestamp */ +
-    Bytes.SIZEOF_BYTE /*keytype*/;
+  public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
 
   // Size of the length shorts and bytes in key.
-  public static final int KEY_INFRASTRUCTURE_SIZE =
-    Bytes.SIZEOF_SHORT /*rowlength*/ +
-    Bytes.SIZEOF_BYTE /*columnfamilylength*/ +
-    TIMESTAMP_TYPE_SIZE;
+  public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
+      + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
 
   // How far into the key the row starts at. First thing to read is the short
   // that says how long the row is.
@@ -701,10 +712,10 @@ public class KeyValue implements Writabl
    */
   /**
    * Produces a string map for this key/value pair. Useful for programmatic use
-   * and manipulation of the data stored in an HLogKey, for example, printing 
-   * as JSON. Values are left out due to their tendency to be large. If needed, 
+   * and manipulation of the data stored in an HLogKey, for example, printing
+   * as JSON. Values are left out due to their tendency to be large. If needed,
    * they can be added manually.
-   * 
+   *
    * @return the Map<String,?> containing data from this key
    */
   public Map<String, Object> toStringMap() {
@@ -1322,7 +1333,7 @@ public class KeyValue implements Writabl
     // Rebuild as: <keylen:4><0:4><key:keylen>
     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
     byte [] newBuffer = new byte[getKeyLength() + (2 * Bytes.SIZEOF_INT) + dataLen];
-    System.arraycopy(this.bytes, this.offset, newBuffer, 0, 
+    System.arraycopy(this.bytes, this.offset, newBuffer, 0,
         Math.min(newBuffer.length,this.length));
     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
     if (lenAsVal) {
@@ -1393,7 +1404,7 @@ public class KeyValue implements Writabl
   }
 
   /**
-   * This function is only used in Meta key comparisons so its error message 
+   * This function is only used in Meta key comparisons so its error message
    * is specific for meta key errors.
    */
   static int getRequiredDelimiterInReverse(final byte [] b,
@@ -1561,7 +1572,7 @@ public class KeyValue implements Writabl
       return getRawComparator().compareRows(left, loffset, llength,
         right, roffset, rlength);
     }
-    
+
     public int compareColumns(final KeyValue left, final byte [] right,
         final int roffset, final int rlength, final int rfamilyoffset) {
       int offset = left.getFamilyOffset();
@@ -1595,7 +1606,7 @@ public class KeyValue implements Writabl
       short lrowlength = left.getRowLength();
       short rrowlength = right.getRowLength();
       // TsOffset = end of column data. just comparing Row+CF length of each
-      return ((left.getTimestampOffset() - left.getOffset()) == 
+      return ((left.getTimestampOffset() - left.getOffset()) ==
               (right.getTimestampOffset() - right.getOffset())) &&
         matchingRows(left, lrowlength, right, rrowlength) &&
         compareColumns(left, lrowlength, right, rrowlength) == 0;
@@ -2004,9 +2015,23 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Avoids redundant comparisons for better performance.
+   */
+  public static interface SamePrefixComparator<T> {
+    /**
+     * Compare two keys assuming that the first n bytes are the same.
+     * @param commonPrefix How many bytes are the same.
+     */
+    public int compareIgnoringPrefix(int commonPrefix,
+        T left, int loffset, int llength,
+        T right, int roffset, int rlength);
+  }
+
+  /**
    * Compare key portion of a {@link KeyValue}.
    */
-  public static class KeyComparator implements RawComparator<byte []> {
+  public static class KeyComparator
+      implements RawComparator<byte []>, SamePrefixComparator<byte[]> {
     volatile boolean ignoreTimestamp = false;
     volatile boolean ignoreType = false;
 
@@ -2016,45 +2041,123 @@ public class KeyValue implements Writabl
       short lrowlength = Bytes.toShort(left, loffset);
       short rrowlength = Bytes.toShort(right, roffset);
       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
-          lrowlength,
-          right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
+          lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
       if (compare != 0) {
         return compare;
       }
 
-      // Compare column family.  Start compare past row and family length.
-      int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset;
-      int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset;
+      // Compare the rest of the two KVs without making any assumptions about
+      // the common prefix. This function will not compare rows anyway, so we
+      // don't need to tell it that the common prefix includes the row.
+      return compareWithoutRow(0, left, loffset, llength, right, roffset,
+          rlength, rrowlength);
+    }
+
+    /**
+     * Compare the two key-values, ignoring the prefix of the given length
+     * that is known to be the same between the two.
+     * @param commonPrefix the prefix length to ignore
+     */
+    @Override
+    public int compareIgnoringPrefix(int commonPrefix, byte[] left,
+        int loffset, int llength, byte[] right, int roffset, int rlength) {
+      // Compare row
+      short lrowlength = Bytes.toShort(left, loffset);
+      short rrowlength;
+
+      int comparisonResult = 0;
+      if (commonPrefix < ROW_LENGTH_SIZE) {
+        // almost nothing in common
+        rrowlength = Bytes.toShort(right, roffset);
+        comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
+            lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
+      } else { // the row length is the same
+        rrowlength = lrowlength;
+        if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
+          // The rows are not the same. Exclude the common prefix and compare
+          // the rest of the two rows.
+          int common = commonPrefix - ROW_LENGTH_SIZE;
+          comparisonResult = compareRows(
+              left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
+              right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
+        }
+      }
+      if (comparisonResult != 0) {
+        return comparisonResult;
+      }
+
+      assert lrowlength == rrowlength;
+
+      return compareWithoutRow(commonPrefix, left, loffset, llength, right,
+          roffset, rlength, lrowlength);
+    }
+
+    /**
+     * Compare column, timestamp, and key type (everything except the row).
+     * This method is used both in the normal comparator and the "same-prefix"
+     * comparator. Note that we are assuming that row portions of both KVs have
+     * already been parsed and found identical, and we don't validate that
+     * assumption here.
+     * @param commonPrefix the length of the common prefix of the two
+     *          key-values being compared, including row length and row
+     */
+    private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
+        int llength, byte[] right, int roffset, int rlength, short rowlength) {
+      // Compare column family. Start comparing past row and family length.
+      int lcolumnoffset = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE +
+          rowlength + loffset;
+      int rcolumnoffset = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE +
+          rowlength + roffset;
       int lcolumnlength = llength - TIMESTAMP_TYPE_SIZE -
-        (lcolumnoffset - loffset);
+          (lcolumnoffset - loffset);
       int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
-        (rcolumnoffset - roffset);
+          (rcolumnoffset - roffset);
 
-      // if row matches, and no column in the 'left' AND put type is 'minimum',
+      // If row matches, and no column in the 'left' AND put type is 'minimum',
       // then return that left is larger than right.
 
-      // This supports 'last key on a row' - the magic is if there is no column in the
-      // left operand, and the left operand has a type of '0' - magical value,
-      // then we say the left is bigger.  This will let us seek to the last key in
-      // a row.
+      // This supports 'last key on a row' - the magic is if there is no column
+      // in the left operand, and the left operand has a type of '0' - magical
+      // value, then we say the left is bigger.  This will let us seek to the
+      // last key in a row.
 
       byte ltype = left[loffset + (llength - 1)];
       byte rtype = right[roffset + (rlength - 1)];
 
+      // If the column is not specified, the "minimum" key type appears the
+      // latest in the sorted order, regardless of the timestamp. This is used
+      // for specifying the last key/value in a given row, because there is no
+      // "lexicographically last column" (it would be infinitely long).  The
+      // "maximum" key type does not need this behavior.
       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
-        return 1; // left is bigger.
+        // left is "bigger", i.e. it appears later in the sorted order
+        return 1;
       }
       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
         return -1;
       }
 
-      // TODO the family and qualifier should be compared separately
-      compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right,
-          rcolumnoffset, rcolumnlength);
-      if (compare != 0) {
-        return compare;
+      int common = 0;
+      if (commonPrefix > 0) {
+        common = Math.max(0, commonPrefix -
+            rowlength - ROW_LENGTH_SIZE - FAMILY_LENGTH_SIZE);
+        common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
       }
 
+      final int comparisonResult = Bytes.compareTo(
+          left, lcolumnoffset + common, lcolumnlength - common,
+          right, rcolumnoffset + common, rcolumnlength - common);
+      if (comparisonResult != 0) {
+        return comparisonResult;
+      }
+
+      return compareTimestampAndType(left, loffset, llength, right, roffset,
+          rlength, ltype, rtype);
+    }
+
+    private int compareTimestampAndType(byte[] left, int loffset, int llength,
+        byte[] right, int roffset, int rlength, byte ltype, byte rtype) {
+      int compare;
       if (!this.ignoreTimestamp) {
         // Get timestamps.
         long ltimestamp = Bytes.toLong(left,
@@ -2069,7 +2172,9 @@ public class KeyValue implements Writabl
 
       if (!this.ignoreType) {
         // Compare types. Let the delete types sort ahead of puts; i.e. types
-        // of higher numbers sort before those of lesser numbers
+        // of higher numbers sort before those of lesser numbers. Maximum (255)
+        // appears ahead of everything, and minimum (0) appears after
+        // everything.
         return (0xff & rtype) - (0xff & ltype);
       }
       return 0;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java Sat Dec 24 21:20:39 2011
@@ -28,6 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncodings;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -60,9 +63,9 @@ public class HalfStoreFileReader extends
    * @throws IOException
    */
   public HalfStoreFileReader(final FileSystem fs, final Path p,
-      final CacheConfig cacheConf, final Reference r)
-  throws IOException {
-    super(fs, p, cacheConf);
+      final CacheConfig cacheConf, final Reference r,
+      HFileDataBlockEncoder dataBlockEncoder) throws IOException {
+    super(fs, p, cacheConf, dataBlockEncoder);
     // This is not actual midkey for this half-file; its just border
     // around which we split top and bottom.  Have to look in files to find
     // actual last and first keys for bottom and top halves.  Half-files don't

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BitsetKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BitsetKeyDeltaEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BitsetKeyDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BitsetKeyDeltaEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Compress using bit fields to avoid repetitions of certain fields.
+ * Also compress the value and key size.
+ *
+ * Format:
+ * <ul>
+ * <li> 1 byte:    flag </li>
+ * <li> 1-4 bytes: key length </li>
+ * <li> 1-4 bytes: value length </li>
+ * <li> Key parts which are new (number of bytes varies) </li>
+ * <li> Value (number of bytes varies) </li>
+ * </ul>
+ *
+ * In worst case compressed KeyValue will be one byte longer than original.
+ */
+public class BitsetKeyDeltaEncoder extends BufferedDataBlockEncoder {
+
+  /* Constants used in flag byte */
+  static final int SAME_ROW_FLAG = 1;
+  static final int SAME_FAMILY_FLAG = 1 << 1;
+  static final int SAME_QUALIFIER_FLAG = 1 << 2;
+  static final int SAME_TYPE_FLAG = 1 << 3;
+  static final int VALUE_SIZE_MASK = (1 << 4) | (1 << 5);
+  static final int VALUE_SIZE_SHIFT = 4;
+  static final int KEY_SIZE_MASK = (1 << 6) | (1 << 7);
+  static final int KEY_SIZE_SHIFT = 6;
+
+  @Override
+  public void compressKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS)
+      throws IOException {
+    in.rewind();
+    ByteBufferUtils.putInt(out, in.limit());
+    CompressionState state = new CompressionState();
+    while (in.hasRemaining()) {
+      compressSingleKeyValue(state, out, in);
+      afterEncodingKeyValue(in, out, includesMemstoreTS);
+    }
+  }
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream in,
+      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+          throws IOException {
+    int decompressedSize = in.readInt();
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+        allocHeaderLength);
+    buffer.position(allocHeaderLength);
+    CompressionState state = new CompressionState();
+    while (in.available() > skipLastBytes) {
+      uncompressSingleKeyValue(in, buffer, state);
+      afterDecodingKeyValue(in, buffer, includesMemstoreTS);
+    }
+
+    if (in.available() != skipLastBytes) {
+      throw new IllegalStateException("Read too much bytes.");
+    }
+
+    return buffer;
+  }
+
+  private void uncompressSingleKeyValue(DataInputStream in,
+      ByteBuffer buffer,
+      CompressionState state)
+          throws IOException, EncoderBufferTooSmallException {
+    byte flag = in.readByte();
+
+    // Read key length
+    int keyLengthFitInBytes = 1 + ((flag & KEY_SIZE_MASK) >>> KEY_SIZE_SHIFT);
+    int keyLength = ByteBufferUtils.readCompressedInt(in, keyLengthFitInBytes);
+
+    // Read value length
+    int valueLengthFitInBytes = 1 +
+        ((flag & VALUE_SIZE_MASK) >>> VALUE_SIZE_SHIFT);
+    int valueLength =
+        ByteBufferUtils.readCompressedInt(in, valueLengthFitInBytes);
+
+    // Create buffer blob and put length and size there.
+    ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
+        + KeyValue.ROW_OFFSET);
+    buffer.putInt(keyLength);
+    buffer.putInt(valueLength);
+    int prevElementOffset = state.prevOffset + KeyValue.ROW_OFFSET;
+    int prevRowOffset = prevElementOffset;
+    prevElementOffset += state.rowLength + KeyValue.ROW_LENGTH_SIZE;
+
+    // Read row
+    if (state.isFirst() || (flag & SAME_ROW_FLAG) == 0) {
+      state.rowLength = in.readShort();
+      buffer.putShort(state.rowLength);
+      ByteBufferUtils.copyFromStream(in, buffer, state.rowLength);
+    } else {
+      ByteBufferUtils.copyFromBuffer(buffer, buffer, prevRowOffset,
+          state.rowLength + KeyValue.ROW_LENGTH_SIZE);
+    }
+
+
+    // Read family
+    int prevFamilyOffset = prevElementOffset;
+    prevElementOffset += state.familyLength + KeyValue.FAMILY_LENGTH_SIZE;
+
+    if (state.isFirst() || (flag & SAME_FAMILY_FLAG) == 0) {
+      state.familyLength = in.readByte();
+      buffer.put(state.familyLength);
+      ByteBufferUtils.copyFromStream(in, buffer, state.familyLength);
+    } else {
+      ByteBufferUtils.copyFromBuffer(buffer, buffer, prevFamilyOffset,
+          state.familyLength + KeyValue.FAMILY_LENGTH_SIZE);
+    }
+
+    // Read qualifier
+    if (state.isFirst() || (flag & SAME_QUALIFIER_FLAG) == 0) {
+      state.qualifierLength = keyLength - state.rowLength - state.familyLength
+          - KeyValue.KEY_INFRASTRUCTURE_SIZE;
+      ByteBufferUtils.copyFromStream(in, buffer, state.qualifierLength);
+    } else {
+      ByteBufferUtils.copyFromBuffer(buffer, buffer,
+          prevElementOffset, state.qualifierLength);
+    }
+
+    // Read timestamp
+    ByteBufferUtils.copyFromStream(in, buffer, KeyValue.TIMESTAMP_SIZE);
+
+    // Read type
+    if (state.isFirst() || (flag & SAME_TYPE_FLAG) == 0) {
+      state.type = in.readByte();
+    }
+    buffer.put(state.type);
+
+    // Read value
+    state.prevOffset = buffer.position() - keyLength - KeyValue.ROW_OFFSET;
+    ByteBufferUtils.copyFromStream(in, buffer, valueLength);
+  }
+
+  private void compressSingleKeyValue(CompressionState state,
+      OutputStream out, ByteBuffer in) throws IOException {
+    int kvPos = in.position();
+    int keyLength = in.getInt();
+    int valueLength = in.getInt();
+
+    byte flags = 0;
+
+    // Key length
+    int keyLengthFitsInBytes = ByteBufferUtils.intFitsIn(keyLength);
+    flags |= (keyLengthFitsInBytes - 1) << KEY_SIZE_SHIFT;
+
+    // Value length
+    int valueLengthFitsInBytes = ByteBufferUtils.intFitsIn(valueLength);
+    flags |= (valueLengthFitsInBytes - 1) << VALUE_SIZE_SHIFT;
+
+    if (state.isFirst()) {
+      ByteBufferUtils.copyToStream(out, flags);
+
+      writeKeyValueCompressedLengths(out, in,
+          keyLengthFitsInBytes, valueLengthFitsInBytes);
+
+      state.readKey(in, keyLength, valueLength);
+      ByteBufferUtils.copyToStream(out, in, keyLength);
+    } else {
+      in.mark(); // beginning of the key
+      int prevElementOffset = state.prevOffset + KeyValue.ROW_OFFSET +
+          KeyValue.ROW_LENGTH_SIZE;
+
+      // Row same
+      short rowLength = in.getShort();
+      int prevRowOffset = prevElementOffset;
+      prevElementOffset += state.rowLength + KeyValue.FAMILY_LENGTH_SIZE;
+
+      if (ByteBufferUtils.arePartsEqual(in, in.position(), rowLength,
+          prevRowOffset, state.rowLength)) {
+        flags |= SAME_ROW_FLAG;
+      } else {
+        state.rowLength = rowLength;
+      }
+      ByteBufferUtils.skip(in, rowLength);
+
+      // Family same
+      byte familyLength = in.get();
+      int prevFamilyOffset = prevElementOffset;
+      prevElementOffset += state.familyLength;
+
+      if (ByteBufferUtils.arePartsEqual(in,
+          in.position(), familyLength,
+          prevFamilyOffset, state.familyLength)) {
+        flags |= SAME_FAMILY_FLAG;
+      } else {
+        state.familyLength = familyLength;
+      }
+      ByteBufferUtils.skip(in, familyLength);
+
+      // Qualifier same
+      int qualifierLength = keyLength - rowLength - familyLength -
+          KeyValue.KEY_INFRASTRUCTURE_SIZE;
+      int prevQualifierOffset = prevElementOffset;
+      if (ByteBufferUtils.arePartsEqual(in, in.position(), qualifierLength,
+          prevQualifierOffset, state.qualifierLength)) {
+        flags |= SAME_QUALIFIER_FLAG;
+      } else {
+        state.qualifierLength = qualifierLength;
+      }
+      ByteBufferUtils.skip(in, qualifierLength + KeyValue.TIMESTAMP_SIZE);
+
+      // Type same
+      byte type = in.get();
+      if (type == state.type) {
+        flags |= SAME_TYPE_FLAG;
+      } else {
+        state.type = type;
+      }
+
+      // write it
+      ByteBufferUtils.copyToStream(out, flags);
+      in.reset(); // return to beginning of the key
+
+      writeKeyValueCompressedLengths(out, in,
+          keyLengthFitsInBytes, valueLengthFitsInBytes);
+
+      if ((flags & SAME_ROW_FLAG) == 0) {
+        ByteBufferUtils.copyToStream(out, in, rowLength
+            + KeyValue.ROW_LENGTH_SIZE);
+      } else {
+        ByteBufferUtils.skip(in, rowLength + KeyValue.ROW_LENGTH_SIZE);
+      }
+
+      if ((flags & SAME_FAMILY_FLAG) == 0) {
+        ByteBufferUtils.copyToStream(out, in, familyLength
+            + KeyValue.FAMILY_LENGTH_SIZE);
+      } else {
+        ByteBufferUtils.skip(in, familyLength + KeyValue.FAMILY_LENGTH_SIZE);
+      }
+
+      if ((flags & SAME_QUALIFIER_FLAG) == 0) {
+        ByteBufferUtils.copyToStream(out, in, qualifierLength);
+      } else {
+        ByteBufferUtils.skip(in, qualifierLength);
+      }
+
+      // Timestamp is always different
+      ByteBufferUtils.copyToStream(out, in, KeyValue.TIMESTAMP_SIZE);
+
+      if ((flags & SAME_TYPE_FLAG) == 0) {
+        ByteBufferUtils.copyToStream(out, type);
+      }
+      ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE);
+    }
+
+    // Copy value
+    state.prevOffset = kvPos;
+    ByteBufferUtils.copyToStream(out, in, valueLength);
+  }
+
+  private void writeKeyValueCompressedLengths(OutputStream out,
+      ByteBuffer in, int keyLengthFitsInBytes,
+      int valueLengthFitsInBytes) throws IOException {
+    int off = in.position() - KeyValue.ROW_OFFSET;
+    ByteBufferUtils.copyToStream(out, in, off + (4 - keyLengthFitsInBytes),
+        keyLengthFitsInBytes);
+    off += KeyValue.KEY_LENGTH_SIZE;
+    ByteBufferUtils.copyToStream(out, in, off + (4 - valueLengthFitsInBytes),
+        valueLengthFitsInBytes);
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    block.mark();
+    block.position(Bytes.SIZEOF_INT);
+    byte flag = block.get();
+    int keyLength = ByteBufferUtils.readCompressedInt(
+        block, 1 + ((flag & KEY_SIZE_MASK) >>> KEY_SIZE_SHIFT));
+
+    // valueLength
+    ByteBufferUtils.readCompressedInt(
+        block, 1 + ((flag & VALUE_SIZE_MASK) >>> VALUE_SIZE_SHIFT));
+    int pos = block.position();
+    block.reset();
+    return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
+  }
+
+  @Override
+  public String toString() {
+    return BitsetKeyDeltaEncoder.class.getSimpleName();
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      final boolean includesMemstoreTS) {
+    return new BufferedEncodedSeeker(comparator) {
+      private int familyLengthWithSize;
+      private int rowLengthWithSize;
+
+      private void decode() {
+        byte type = 0;
+
+        // Read key and value length
+        byte flag = currentBuffer.get();
+        int oldKeyLength = current.keyLength;
+        int keyLengthFitInBytes = 1 + ((flag & KEY_SIZE_MASK) >>> KEY_SIZE_SHIFT);
+        current.keyLength =
+            ByteBufferUtils.readCompressedInt(currentBuffer,
+                keyLengthFitInBytes);
+
+        // Read value length
+        int valueLengthFitInBytes = 1 +
+            ((flag & VALUE_SIZE_MASK) >>> VALUE_SIZE_SHIFT);
+        current.valueLength =
+            ByteBufferUtils.readCompressedInt(currentBuffer,
+                valueLengthFitInBytes);
+
+        if (oldKeyLength != current.keyLength && (flag & SAME_TYPE_FLAG) != 0) {
+          type = current.keyBuffer[oldKeyLength -1];
+        }
+
+        current.lastCommonPrefix = 0;
+        switch (flag &
+            (SAME_ROW_FLAG | SAME_FAMILY_FLAG | SAME_QUALIFIER_FLAG)) {
+            case SAME_ROW_FLAG | SAME_FAMILY_FLAG | SAME_QUALIFIER_FLAG:
+              current.lastCommonPrefix = current.keyLength -
+                  familyLengthWithSize - rowLengthWithSize - // will be added
+                  KeyValue.TIMESTAMP_TYPE_SIZE;
+            //$FALL-THROUGH$
+            case SAME_ROW_FLAG | SAME_FAMILY_FLAG:
+              current.lastCommonPrefix +=
+                  familyLengthWithSize + rowLengthWithSize;
+            //$FALL-THROUGH$
+            case 0: // fall through
+              currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+                  current.keyLength - current.lastCommonPrefix -
+                  Bytes.SIZEOF_BYTE);
+              break;
+
+            case SAME_FAMILY_FLAG:
+            //$FALL-THROUGH$
+            case SAME_FAMILY_FLAG | SAME_QUALIFIER_FLAG:
+
+              // find size of new row
+              currentBuffer.get(current.keyBuffer, 0, Bytes.SIZEOF_SHORT);
+              int oldRowLengthWithSize = rowLengthWithSize;
+              rowLengthWithSize = Bytes.toShort(current.keyBuffer) +
+                  Bytes.SIZEOF_SHORT;
+
+              // move the column family and qualifier
+              int moveLength;
+              if ((flag & SAME_QUALIFIER_FLAG) == 0) {
+                moveLength = familyLengthWithSize;
+              } else {
+                moveLength = current.keyLength - rowLengthWithSize -
+                    KeyValue.TIMESTAMP_TYPE_SIZE;
+              }
+              System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
+                  current.keyBuffer, rowLengthWithSize, moveLength);
+
+              // copy row
+              currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
+                  rowLengthWithSize - Bytes.SIZEOF_SHORT);
+
+              // copy qualifier and timestamp
+              if ((flag & SAME_QUALIFIER_FLAG) == 0) {
+                currentBuffer.get(current.keyBuffer,
+                    rowLengthWithSize + familyLengthWithSize,
+                    current.keyLength - rowLengthWithSize -
+                    familyLengthWithSize - Bytes.SIZEOF_BYTE);
+              } else {
+                currentBuffer.get(current.keyBuffer,
+                    current.keyLength - KeyValue.TIMESTAMP_TYPE_SIZE,
+                    Bytes.SIZEOF_LONG);
+              }
+              break;
+
+            case SAME_QUALIFIER_FLAG:
+            //$FALL-THROUGH$
+            case SAME_QUALIFIER_FLAG | SAME_ROW_FLAG:
+            //$FALL-THROUGH$
+            case SAME_ROW_FLAG:
+            //$FALL-THROUGH$
+            default:
+              throw new RuntimeException("Unexpected flag!");
+        }
+
+        // we need to save length for the first key
+        if ((flag & SAME_ROW_FLAG) == 0) {
+          rowLengthWithSize = Bytes.toShort(current.keyBuffer) +
+              Bytes.SIZEOF_SHORT;
+          familyLengthWithSize = current.keyBuffer[rowLengthWithSize] +
+              Bytes.SIZEOF_BYTE;
+        } else if ((flag & SAME_FAMILY_FLAG) != 0) {
+          familyLengthWithSize = current.keyBuffer[rowLengthWithSize] +
+              Bytes.SIZEOF_BYTE;
+        }
+
+        // type
+        if ((flag & SAME_TYPE_FLAG) == 0) {
+          currentBuffer.get(current.keyBuffer,
+              current.keyLength - Bytes.SIZEOF_BYTE, Bytes.SIZEOF_BYTE);
+        } else if (oldKeyLength != current.keyLength) {
+          current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE] = type;
+        }
+
+        // value
+        current.valueOffset = currentBuffer.position();
+        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+
+        if (includesMemstoreTS) {
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+        } else {
+          current.memstoreTS = 0;
+        }
+      }
+
+      @Override
+      protected void decodeNext() {
+        decode();
+      }
+
+      @Override
+      protected void decodeFirst() {
+        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        decode();
+      }
+    };
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Base class for all data block encoders that use a buffer.
+ */
+abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
+
+  private static int INITIAL_KEY_BUFFER_SIZE = 512;
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      boolean includesMemstoreTS) throws IOException {
+    return uncompressKeyValues(source, 0, 0, includesMemstoreTS);
+  }
+
+  protected static class SeekerState {
+    protected int valueOffset = -1;
+    protected int keyLength;
+    protected int valueLength;
+    protected int lastCommonPrefix;
+
+    /** We need to store a copy of the key. */
+    protected byte[] keyBuffer = new byte[INITIAL_KEY_BUFFER_SIZE];
+
+    protected long memstoreTS;
+    protected int nextKvOffset;
+
+    protected boolean isValid() {
+      return valueOffset != -1;
+    }
+
+    protected void invalidate() {
+      valueOffset = -1;
+    }
+
+    protected void ensureSpaceForKey() {
+      if (keyLength > keyBuffer.length) {
+        // rare case, but we need to handle arbitrary length of key
+        int newKeyBufferLength = Math.max(keyBuffer.length, 1) * 2;
+        while (keyLength > newKeyBufferLength) {
+          newKeyBufferLength *= 2;
+        }
+        byte[] newKeyBuffer = new byte[newKeyBufferLength];
+        System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
+        keyBuffer = newKeyBuffer;
+      }
+    }
+  }
+
+  protected abstract static class BufferedEncodedSeeker
+      implements EncodedSeeker {
+
+    protected final RawComparator<byte[]> comparator;
+    protected final SamePrefixComparator<byte[]> samePrefixComparator;
+    protected ByteBuffer currentBuffer;
+    protected SeekerState current = new SeekerState(); // always valid
+    protected SeekerState previous = new SeekerState(); // may not be valid
+
+    @SuppressWarnings("unchecked")
+    public BufferedEncodedSeeker(RawComparator<byte[]> comparator) {
+      this.comparator = comparator;
+      if (comparator instanceof SamePrefixComparator) {
+        this.samePrefixComparator = (SamePrefixComparator<byte[]>) comparator;
+      } else {
+        this.samePrefixComparator = null;
+      }
+    }
+
+    @Override
+    public void setCurrentBuffer(ByteBuffer buffer) {
+      currentBuffer = buffer;
+      decodeFirst();
+      previous.invalidate();
+    }
+
+    @Override
+    public ByteBuffer getKey() {
+      ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
+      keyBuffer.put(current.keyBuffer, 0, current.keyLength);
+      return keyBuffer;
+    }
+
+    @Override
+    public ByteBuffer getValue() {
+      return ByteBuffer.wrap(currentBuffer.array(),
+          currentBuffer.arrayOffset() + current.valueOffset,
+          current.valueLength);
+    }
+
+    @Override
+    public ByteBuffer getKeyValue() {
+      ByteBuffer kvBuffer = ByteBuffer.allocate(
+          2 * Bytes.SIZEOF_INT + current.keyLength + current.valueLength);
+      kvBuffer.putInt(current.keyLength);
+      kvBuffer.putInt(current.valueLength);
+      kvBuffer.put(current.keyBuffer, 0, current.keyLength);
+      kvBuffer.put(currentBuffer.array(),
+          currentBuffer.arrayOffset() + current.valueOffset,
+          current.valueLength);
+      return kvBuffer;
+    }
+
+    @Override
+    public KeyValue getKeyValueObject() {
+      ByteBuffer kvBuf = getKeyValue();
+      KeyValue kv = new KeyValue(kvBuf.array(), kvBuf.arrayOffset());
+      kv.setMemstoreTS(current.memstoreTS);
+      return kv;
+    }
+
+    @Override
+    public void rewind() {
+      currentBuffer.rewind();
+      decodeFirst();
+      previous.invalidate();
+    }
+
+    @Override
+    public boolean next() {
+      if (!currentBuffer.hasRemaining()) {
+        return false;
+      }
+      decodeNext();
+      previous.invalidate();
+      return true;
+    }
+
+    @Override
+    public int blockSeekTo(byte[] key, int offset, int length,
+        boolean seekBefore) {
+      int commonPrefix = 0;
+      previous.invalidate();
+      do {
+        int comp;
+        if (samePrefixComparator != null) {
+          commonPrefix = Math.min(commonPrefix, current.lastCommonPrefix);
+
+          // extend commonPrefix
+          commonPrefix += ByteBufferUtils.findCommonPrefix(
+              key, offset + commonPrefix, length - commonPrefix,
+              current.keyBuffer, commonPrefix,
+              current.keyLength - commonPrefix);
+
+          comp = samePrefixComparator.compareIgnoringPrefix(
+              commonPrefix,
+              key, offset, length, current.keyBuffer, 0, current.keyLength);
+        } else {
+          comp = comparator.compare(key, offset, length,
+              current.keyBuffer, 0, current.keyLength);
+        }
+
+        if (comp == 0) { // exact match
+          if (seekBefore) {
+            moveToPrevious();
+            return 1;
+          }
+          return 0;
+        }
+
+        if (comp < 0) { // already too large, check previous
+          if (previous.isValid()) {
+            moveToPrevious();
+          }
+          return 1;
+        }
+
+        // move to next, if more data is available
+        if (currentBuffer.hasRemaining()) {
+          savePrevious();
+          decodeNext();
+        } else {
+          break;
+        }
+      } while (true);
+
+      // we hit end of file, not exact match
+      return 1;
+    }
+
+    private void moveToPrevious() {
+      if (!previous.isValid()) {
+        throw new IllegalStateException(
+            "Can move back only once and not in first key in the block.");
+      }
+
+      SeekerState tmp = previous;
+      previous = current;
+      current = tmp;
+
+      // move after last key value
+      currentBuffer.position(current.nextKvOffset);
+
+      previous.invalidate();
+    }
+
+    private void savePrevious() {
+      previous.valueOffset = current.valueOffset;
+      previous.keyLength = current.keyLength;
+      previous.valueLength = current.valueLength;
+      previous.lastCommonPrefix = current.lastCommonPrefix;
+      previous.nextKvOffset = current.nextKvOffset;
+      if (previous.keyBuffer.length != current.keyBuffer.length) {
+        previous.keyBuffer = current.keyBuffer.clone();
+      } else if (!previous.isValid()) {
+        System.arraycopy(current.keyBuffer, 0, previous.keyBuffer, 0,
+             current.keyLength);
+      } else {
+        // don't copy the common prefix between this key and the previous one
+        System.arraycopy(current.keyBuffer, current.lastCommonPrefix,
+            previous.keyBuffer, current.lastCommonPrefix, current.keyLength
+                - current.lastCommonPrefix);
+      }
+    }
+
+    abstract protected void decodeFirst();
+    abstract protected void decodeNext();
+  }
+
+  protected final void afterEncodingKeyValue(ByteBuffer in,
+      DataOutputStream out, boolean includesMemstoreTS) {
+    if (includesMemstoreTS) {
+      // Copy memstore timestamp from the byte buffer to the output stream.
+      long memstoreTS = -1;
+      try {
+        memstoreTS = ByteBufferUtils.readVLong(in);
+        WritableUtils.writeVLong(out, memstoreTS);
+      } catch (IOException ex) {
+        throw new RuntimeException("Unable to copy memstore timestamp " +
+            memstoreTS + " after encoding a key/value");
+      }
+    }
+  }
+
+  static DataInput asDataInputStream(final ByteBuffer in) {
+    return new DataInputStream(new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return in.get() & 0xff;
+      }
+    });
+  }
+
+  protected final void afterDecodingKeyValue(DataInputStream source,
+      ByteBuffer dest, boolean includesMemstoreTS) {
+    if (includesMemstoreTS) {
+      long memstoreTS = -1;
+      try {
+        // Copy memstore timestamp from the data input stream to the byte
+        // buffer.
+        memstoreTS = WritableUtils.readVLong(source);
+        ByteBufferUtils.writeVLong(dest, memstoreTS);
+      } catch (IOException ex) {
+        throw new RuntimeException("Unable to copy memstore timestamp " +
+            memstoreTS + " after decoding a key/value");
+      }
+    }
+  }
+
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * Stores the state of data block encoder at the beginning of new key.
+ */
+class CompressionState {
+  int keyLength;
+  int valueLength;
+
+  short rowLength;
+  int prevOffset = FIRST_KEY;
+  byte familyLength;
+  int qualifierLength;
+  byte type;
+
+  final static int FIRST_KEY = -1;
+
+  boolean isFirst() {
+    return prevOffset == FIRST_KEY;
+  }
+
+  /**
+   * Analyze the key and fill the state.
+   * Uses mark() and reset() in ByteBuffer.
+   * @param in Buffer at the position where key starts
+   * @param keyLength Length of key in bytes
+   * @param valueLength Length of values in bytes
+   */
+  void readKey(ByteBuffer in, int keyLength, int valueLength) {
+    readKey(in, keyLength, valueLength, 0, null);
+  }
+
+  /** 
+   * Analyze the key and fill the state assuming we know previous state.
+   * Uses mark() and reset() in ByteBuffer.
+   * @param in Buffer at the position where key starts
+   * @param keyLength Length of key in bytes
+   * @param valueLength Length of values in bytes
+   * @param commonPrefix how many first bytes are common with previous KeyValue
+   * @param previousState State from previous KeyValue
+   */
+  void readKey(ByteBuffer in, int keyLength, int valueLength,
+      int commonPrefix, CompressionState previousState) {
+    this.keyLength = keyLength;
+    this.valueLength = valueLength;
+
+    // fill the state
+    in.mark(); // mark beginning of key
+
+    if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
+      rowLength = in.getShort();
+      ByteBufferUtils.skip(in, rowLength);
+
+      familyLength = in.get();
+
+      qualifierLength = keyLength - rowLength - familyLength -
+          KeyValue.KEY_INFRASTRUCTURE_SIZE;
+      ByteBufferUtils.skip(in, familyLength + qualifierLength);
+    } else {
+      rowLength = previousState.rowLength;
+      familyLength = previousState.familyLength;
+      qualifierLength = previousState.qualifierLength +
+          keyLength - previousState.keyLength;
+      ByteBufferUtils.skip(in, (KeyValue.ROW_LENGTH_SIZE +
+          KeyValue.FAMILY_LENGTH_SIZE) +
+          rowLength + familyLength + qualifierLength);
+    }
+
+    readTimestamp(in);
+
+    type = in.get();
+
+    in.reset();
+  }
+
+  protected void readTimestamp(ByteBuffer in) {
+    // used in subclasses to add timestamp to state
+    ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_SIZE);
+  }
+
+  void copyFrom(CompressionState state) {
+    keyLength = state.keyLength;
+    valueLength = state.valueLength;
+
+    rowLength = state.rowLength;
+    prevOffset = state.prevOffset;
+    familyLength = state.familyLength;
+    qualifierLength = state.qualifierLength;
+    type = state.type;
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Just copy data, do not do any kind of compression. Use for comparison and
+ * benchmarking.
+ */
+public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
+  @Override
+  public void compressKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+    in.rewind();
+    ByteBufferUtils.putInt(out, in.limit());
+    ByteBufferUtils.copyToStream(out, in, in.limit());
+  }
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      int preserveHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+      throws IOException {
+    int decompressedSize = source.readInt();
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+        preserveHeaderLength);
+    buffer.position(preserveHeaderLength);
+    ByteBufferUtils.copyFromStream(source, buffer, decompressedSize);
+
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    int keyLength = block.getInt(Bytes.SIZEOF_INT);
+    return ByteBuffer.wrap(block.array(),
+        block.arrayOffset() + 3 * Bytes.SIZEOF_INT, keyLength).slice();
+  }
+
+
+  @Override
+  public String toString() {
+    return CopyKeyDataBlockEncoder.class.getSimpleName();
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      final boolean includesMemstoreTS) {
+    return new BufferedEncodedSeeker(comparator) {
+      @Override
+      protected void decodeNext() {
+        current.keyLength = currentBuffer.getInt();
+        current.valueLength = currentBuffer.getInt();
+        current.ensureSpaceForKey();
+        currentBuffer.get(current.keyBuffer, 0, current.keyLength);
+        current.valueOffset = currentBuffer.position();
+        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        if (includesMemstoreTS) {
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+        } else {
+          current.memstoreTS = 0;
+        }
+        current.nextKvOffset = currentBuffer.position();
+      }
+
+      @Override
+      protected void decodeFirst() {
+        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        current.lastCommonPrefix = 0;
+        decodeNext();
+      }
+    };
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Encoding of KeyValue. It aims to be fast and efficient using assumptions:
+ * <ul>
+ * <li>the KeyValue are stored sorted by key</li>
+ * <li>we know the structure of KeyValue</li>
+ * <li>the values are iterated always forward from beginning of block</li>
+ * <li>knowledge of Key Value format</li>
+ * </ul>
+ * It is designed to work fast enough to be feasible as in memory compression.
+ */
+public interface DataBlockEncoder {
+  /**
+   * Compress KeyValues and write them to output buffer.
+   * @param out Where to write compressed data.
+   * @param in Source of KeyValue for compression.
+   * @param includesMemstoreTS true if including memstore timestamp after every
+   *          key-value pair
+   * @throws IOException If there is an error writing to output stream.
+   */
+  public void compressKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException;
+
+  /**
+   * Uncompress.
+   * @param source Compressed stream of KeyValues.
+   * @param includesMemstoreTS true if including memstore timestamp after every
+   *          key-value pair
+   * @return Uncompressed block of KeyValues.
+   * @throws IOException If there is an error in source.
+   */
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      boolean includesMemstoreTS) throws IOException;
+
+  /**
+   * Uncompress.
+   * @param source Compressed stream of KeyValues.
+   * @param allocateHeaderLength allocate this many bytes for the header.
+   * @param skipLastBytes Do not copy n last bytes.
+   * @param includesMemstoreTS true if including memstore timestamp after every
+   *          key-value pair
+   * @return Uncompressed block of KeyValues.
+   * @throws IOException If there is an error in source.
+   */
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+      throws IOException;
+
+  /**
+   * Return first key in block. Useful for indexing.
+   * @param block encoded block we want index, the position will not change
+   * @return First key in block.
+   */
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block);
+
+  /**
+   * Create a HFileBlock seeker which find KeyValues within a block.
+   * @param comparator what kind of comparison should be used
+   * @param includesMemstoreTS true if including memstore timestamp after every
+   *          key-value pair
+   * @return A newly created seeker.
+   */
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      boolean includesMemstoreTS);
+
+  /**
+   * An interface which enable to seek while underlying data is encoded.
+   *
+   * It works on one HFileBlock, but it is reusable. See
+   * {@link #setCurrentBuffer(ByteBuffer)}.
+   */
+  public static interface EncodedSeeker {
+    /**
+     * Set on which buffer there will be done seeking.
+     * @param buffer Used for seeking.
+     */
+    public void setCurrentBuffer(ByteBuffer buffer);
+
+    /** @return key at current position */
+    public ByteBuffer getKey();
+
+    /** @return value at current position */
+    public ByteBuffer getValue();
+
+    /** @return key value at current position. */
+    public ByteBuffer getKeyValue();
+
+    /**
+     * @return the KeyValue object at the current position. Includes memstore
+     *         timestamp.
+     */
+    public KeyValue getKeyValueObject();
+
+    /** Set position to beginning of given block */
+    public void rewind();
+
+    /**
+     * Move to next position
+     * @return true on success, false if there is no more positions.
+     */
+    public boolean next();
+
+    /**
+     * Move position to the same key (or one before it).
+     * @param key Array where is the key.
+     * @param offset Key position in array.
+     * @param length Key length in array.
+     * @param seekBefore find the key before in case of exact match. Does not
+     *          matter in case of an inexact match.
+     * @return 0 on exact match, 1 on inexact match.
+     */
+    public int blockSeekTo(byte[] key, int offset, int length,
+        boolean seekBefore);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncodings.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncodings.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncodings.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncodings.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Provide access to all data block encoding algorithms.
+ */
+public class DataBlockEncodings {
+
+  /** Constructor. This class cannot be instantiated. */
+  private DataBlockEncodings() {
+  }
+
+  /**
+   * Algorithm type. All of the algorithms are required to have unique id which
+   * should _NEVER_ be changed. If you want to add a new algorithm/version,
+   * assign it a new id. Announce the new id in the HBase mailing list to
+   * prevent collisions.
+   */
+  public static enum Algorithm {
+    /**
+     * Disable data block encoding.
+     */
+    NONE(0, null),
+    BITSET(1, new BitsetKeyDeltaEncoder()),
+    PREFIX(2, new PrefixKeyDeltaEncoder()),
+    DIFF(3, new DiffKeyDeltaEncoder()),
+    FAST_DIFF(4, new FastDiffDeltaEncoder());
+
+    private final short id;
+    private final byte[] idInBytes;
+    private final DataBlockEncoder encoder;
+
+    private Algorithm(int id, DataBlockEncoder encoder) {
+      if (id < Short.MIN_VALUE || id > Short.MAX_VALUE) {
+        throw new AssertionError(
+            "Data block encoding algorithm id is out of range: " + id);
+      }
+      this.id = (short) id;
+      this.idInBytes = Bytes.toBytes(this.id);
+      if (idInBytes.length != HFileBlock.DATA_BLOCK_ENCODER_ID_SIZE) {
+        // White this may seem redundant, if we accidentally serialize
+        // the id as e.g. an int instead of a short, all encoders will break.
+        throw new RuntimeException("Unexpected length of encoder ID byte " +
+            "representation: " + Bytes.toStringBinary(idInBytes));
+      }
+      this.encoder = encoder;
+    }
+
+    /**
+     * @return Name converted to bytes.
+     */
+    public byte[] getNameInBytes() {
+      return Bytes.toBytes(toString());
+    }
+
+    /**
+     * @return The id of a data block encoder.
+     */
+    public short getId() {
+      return id;
+    }
+
+    /**
+     * Writes id in bytes.
+     * @param stream Where it should be written.
+     */
+    public void writeIdInBytes(OutputStream stream) throws IOException {
+      stream.write(idInBytes);
+    }
+
+    /**
+     * Return new data block encoder for given algorithm type.
+     * @return data block encoder if algorithm is specified, null if none is
+     *         selected.
+     */
+    public DataBlockEncoder getEncoder() {
+      return encoder;
+    }
+  }
+
+  /**
+   * Maps encoding algorithm ids to algorithm instances for all algorithms in
+   * the {@link Algorithm} enum.
+   */
+  private static final Map<Short, Algorithm> idToAlgorithm =
+      new HashMap<Short, Algorithm>();
+
+  /** Size of a delta encoding algorithm id */
+  public static final int ID_SIZE = Bytes.SIZEOF_SHORT;
+
+  static {
+    for (Algorithm algo : Algorithm.values()) {
+      if (idToAlgorithm.containsKey(algo.getId())) {
+        throw new RuntimeException(String.format(
+            "Two data block encoder algorithms '%s' and '%s' has same id '%d",
+            idToAlgorithm.get(algo.getId()).toString(), algo.toString(),
+            (int) algo.getId()));
+      }
+      idToAlgorithm.put(algo.getId(), algo);
+    }
+  }
+
+  /**
+   * Provide access to all data block encoders, even those which are not
+   * exposed in the enum. Useful for testing and benchmarking.
+   * @return list of all data block encoders.
+   */
+  public static List<DataBlockEncoder> getAllEncoders() {
+    ArrayList<DataBlockEncoder> encoders = new ArrayList<DataBlockEncoder>();
+    for (Algorithm algo : Algorithm.values()) {
+      DataBlockEncoder encoder = algo.getEncoder();
+      if (encoder != null) {
+        encoders.add(encoder);
+      }
+    }
+
+    // Add encoders that are only used in testing.
+    encoders.add(new CopyKeyDataBlockEncoder());
+    return encoders;
+  }
+
+  /**
+   * Find and create data block encoder for given id;
+   * @param encoderId id of data block encoder.
+   * @return Newly created data block encoder.
+   */
+  public static DataBlockEncoder getDataBlockEncoderFromId(short encoderId) {
+    if (!idToAlgorithm.containsKey(encoderId)) {
+      throw new IllegalArgumentException(String.format(
+          "There is no data block encoder for given id '%d'",
+          (int) encoderId));
+    }
+
+    return idToAlgorithm.get(encoderId).getEncoder();
+  }
+
+  /**
+   * Find and return name of data block encoder for given id.
+   * @param encoderId id of data block encoder
+   * @return name, same as used in options in column family
+   */
+  public static String getNameFromId(short encoderId) {
+    return idToAlgorithm.get(encoderId).toString();
+  }
+
+  /**
+   * Check if given encoder has this id.
+   * @param encoder encoder which id will be checked
+   * @param encoderId id which we except
+   * @return true if id is right for given encoder, false otherwise
+   * @exception IllegalArgumentException
+   *            thrown when there is no matching data block encoder
+   */
+  public static boolean isCorrectEncoder(DataBlockEncoder encoder,
+      short encoderId) {
+    if (!idToAlgorithm.containsKey(encoderId)) {
+      throw new IllegalArgumentException(String.format(
+          "There is no data block encoder for given id '%d'",
+          (int) encoderId));
+    }
+
+    Algorithm algorithm = idToAlgorithm.get(encoderId);
+    return algorithm.getClass().equals(encoder.getClass());
+  }
+
+}