You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/12/24 22:20:41 UTC

svn commit: r1223020 [2/5] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/mapr...

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Compress using:
+ * - store size of common prefix
+ * - save column family once, it is same within HFile
+ * - use integer compression for key, value and prefix (7-bit encoding)
+ * - use bits to avoid duplication key length, value length
+ *   and type if it same as previous
+ * - store in 3 bits length of timestamp field
+ * - allow diff in timestamp instead of actual value
+ *
+ * Format:
+ * - 1 byte:    flag
+ * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
+ * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
+ * - 1-5 bytes: prefix length
+ * - ... bytes: rest of the row (if prefix length is small enough)
+ * - ... bytes: qualifier (or suffix depending on prefix length)
+ * - 1-8 bytes: timestamp or diff
+ * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
+ * - ... bytes: value
+ */
+public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
+  static final int FLAG_SAME_KEY_LENGTH = 1;
+  static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
+  static final int FLAG_SAME_TYPE = 1 << 2;
+  static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
+  static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
+  static final int SHIFT_TIMESTAMP_LENGTH = 4;
+  static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
+
+  protected static class DiffCompressionState extends CompressionState {
+    long timestamp;
+    byte[] familyNameWithSize;
+
+    @Override
+    protected void readTimestamp(ByteBuffer in) {
+      timestamp = in.getLong();
+    }
+
+    @Override
+    void copyFrom(CompressionState state) {
+      super.copyFrom(state);
+      DiffCompressionState state2 = (DiffCompressionState) state;
+      timestamp = state2.timestamp;
+    }
+  }
+
+  private void compressSingleKeyValue(DiffCompressionState previousState,
+      DiffCompressionState currentState, DataOutputStream out,
+      ByteBuffer in) throws IOException {
+    byte flag = 0;
+    int kvPos = in.position();
+    int keyLength = in.getInt();
+    int valueLength = in.getInt();
+
+    long timestamp;
+    long diffTimestamp = 0;
+    int diffTimestampFitsInBytes = 0;
+
+    int commonPrefix;
+
+    int timestampFitsInBytes;
+
+    if (previousState.isFirst()) {
+      currentState.readKey(in, keyLength, valueLength);
+      currentState.prevOffset = kvPos;
+      timestamp = currentState.timestamp;
+      if (timestamp < 0) {
+        flag |= FLAG_TIMESTAMP_SIGN;
+        timestamp = -timestamp;
+      }
+      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+      flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+      commonPrefix = 0;
+
+      // put column family
+      in.mark();
+      ByteBufferUtils.skip(in, currentState.rowLength
+          + KeyValue.ROW_LENGTH_SIZE);
+      ByteBufferUtils.copyToStream(out, in, currentState.familyLength
+          + KeyValue.FAMILY_LENGTH_SIZE);
+      in.reset();
+    } else {
+      // find a common prefix and skip it
+      commonPrefix =
+          ByteBufferUtils.findCommonPrefix(in, in.position(),
+              previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
+                  - KeyValue.TIMESTAMP_TYPE_SIZE);
+      // don't compress timestamp and type using prefix
+
+      currentState.readKey(in, keyLength, valueLength,
+          commonPrefix, previousState);
+      currentState.prevOffset = kvPos;
+      timestamp = currentState.timestamp;
+      boolean minusTimestamp = timestamp < 0;
+      if (minusTimestamp) {
+        timestamp = -timestamp;
+      }
+      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+      if (keyLength == previousState.keyLength) {
+        flag |= FLAG_SAME_KEY_LENGTH;
+      }
+      if (valueLength == previousState.valueLength) {
+        flag |= FLAG_SAME_VALUE_LENGTH;
+      }
+      if (currentState.type == previousState.type) {
+        flag |= FLAG_SAME_TYPE;
+      }
+
+      // encode timestamp
+      diffTimestamp = previousState.timestamp - currentState.timestamp;
+      boolean minusDiffTimestamp = diffTimestamp < 0;
+      if (minusDiffTimestamp) {
+        diffTimestamp = -diffTimestamp;
+      }
+      diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
+      if (diffTimestampFitsInBytes < timestampFitsInBytes) {
+        flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+        flag |= FLAG_TIMESTAMP_IS_DIFF;
+        if (minusDiffTimestamp) {
+          flag |= FLAG_TIMESTAMP_SIGN;
+        }
+      } else {
+        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+        if (minusTimestamp) {
+          flag |= FLAG_TIMESTAMP_SIGN;
+        }
+      }
+    }
+
+    ByteBufferUtils.copyToStream(out, flag);
+    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+      ByteBufferUtils.putCompressedInt(out, keyLength);
+    }
+    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+      ByteBufferUtils.putCompressedInt(out, valueLength);
+    }
+
+    ByteBufferUtils.putCompressedInt(out, commonPrefix);
+    ByteBufferUtils.skip(in, commonPrefix);
+
+    if (previousState.isFirst() ||
+        commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+      int restRowLength =
+          currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+      ByteBufferUtils.copyToStream(out, in, restRowLength);
+      ByteBufferUtils.skip(in, currentState.familyLength +
+          KeyValue.FAMILY_LENGTH_SIZE);
+      ByteBufferUtils.copyToStream(out, in, currentState.qualifierLength);
+    } else {
+      ByteBufferUtils.copyToStream(out, in,
+          keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
+    }
+
+    if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
+      ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
+    } else {
+      ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
+    }
+
+    if ((flag & FLAG_SAME_TYPE) == 0) {
+      ByteBufferUtils.copyToStream(out, currentState.type);
+    }
+    ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
+
+    ByteBufferUtils.copyToStream(out, in, valueLength);
+  }
+
+  private void uncompressSingleKeyValue(DataInputStream source,
+      ByteBuffer buffer,
+      DiffCompressionState state)
+          throws IOException, EncoderBufferTooSmallException {
+    // read the column family at the beginning
+    if (state.isFirst()) {
+      state.familyLength = source.readByte();
+      state.familyNameWithSize =
+          new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
+      state.familyNameWithSize[0] = state.familyLength;
+      source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
+          state.familyLength);
+    }
+
+    // read flag
+    byte flag = source.readByte();
+
+    // read key/value/common lengths
+    int keyLength;
+    int valueLength;
+    if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
+      keyLength = state.keyLength;
+    } else {
+      keyLength = ByteBufferUtils.readCompressedInt(source);
+    }
+    if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
+      valueLength = state.valueLength;
+    } else {
+      valueLength = ByteBufferUtils.readCompressedInt(source);
+    }
+    int commonPrefix = ByteBufferUtils.readCompressedInt(source);
+
+    // create KeyValue buffer and fill it prefix
+    int keyOffset = buffer.position();
+    ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
+        + KeyValue.ROW_OFFSET);
+    buffer.putInt(keyLength);
+    buffer.putInt(valueLength);
+
+    // copy common from previous key
+    if (commonPrefix > 0) {
+      ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset
+          + KeyValue.ROW_OFFSET, commonPrefix);
+    }
+
+    // copy the rest of the key from the buffer
+    int keyRestLength;
+    if (state.isFirst() || commonPrefix <
+        state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+      // omit the family part of the key, it is always the same
+      short rowLength;
+      int rowRestLength;
+
+      // check length of row
+      if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
+        // not yet copied, do it now
+        ByteBufferUtils.copyFromStream(source, buffer,
+            KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+        ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
+        rowLength = buffer.getShort();
+        rowRestLength = rowLength;
+      } else {
+        // already in buffer, just read it
+        rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
+        rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+      }
+
+      // copy the rest of row
+      ByteBufferUtils.copyFromStream(source, buffer, rowRestLength);
+      state.rowLength = rowLength;
+
+      // copy the column family
+      buffer.put(state.familyNameWithSize);
+
+      keyRestLength = keyLength - rowLength -
+          state.familyNameWithSize.length -
+          (KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
+    } else {
+      // prevRowWithSizeLength is the same as on previous row
+      keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
+    }
+    // copy the rest of the key, after column family -> column qualifier
+    ByteBufferUtils.copyFromStream(source, buffer, keyRestLength);
+
+    // handle timestamp
+    int timestampFitsInBytes =
+        ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
+    long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
+    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
+      timestamp = -timestamp;
+    }
+    if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
+      timestamp = state.timestamp - timestamp;
+    }
+    buffer.putLong(timestamp);
+
+    // copy the type field
+    byte type;
+    if ((flag & FLAG_SAME_TYPE) != 0) {
+      type = state.type;
+    } else {
+      type = source.readByte();
+    }
+    buffer.put(type);
+
+    // copy value part
+    ByteBufferUtils.copyFromStream(source, buffer, valueLength);
+
+    state.keyLength = keyLength;
+    state.valueLength = valueLength;
+    state.prevOffset = keyOffset;
+    state.timestamp = timestamp;
+    state.type = type;
+    // state.qualifier is unused
+  }
+
+  @Override
+  public void compressKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+    in.rewind();
+    ByteBufferUtils.putInt(out, in.limit());
+    DiffCompressionState previousState = new DiffCompressionState();
+    DiffCompressionState currentState = new DiffCompressionState();
+    while (in.hasRemaining()) {
+      compressSingleKeyValue(previousState, currentState,
+          out, in);
+      afterEncodingKeyValue(in, out, includesMemstoreTS);
+
+      // swap previousState <-> currentState
+      DiffCompressionState tmp = previousState;
+      previousState = currentState;
+      currentState = tmp;
+    }
+  }
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+      throws IOException {
+    int decompressedSize = source.readInt();
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+        allocHeaderLength);
+    buffer.position(allocHeaderLength);
+    DiffCompressionState state = new DiffCompressionState();
+    while (source.available() > skipLastBytes) {
+      uncompressSingleKeyValue(source, buffer, state);
+      afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+    }
+
+    if (source.available() != skipLastBytes) {
+      throw new IllegalStateException("Read too much bytes.");
+    }
+
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    block.mark();
+    block.position(Bytes.SIZEOF_INT);
+    byte familyLength = block.get();
+    ByteBufferUtils.skip(block, familyLength);
+    byte flag = block.get();
+    int keyLength = ByteBufferUtils.readCompressedInt(block);
+    ByteBufferUtils.readCompressedInt(block); // valueLength
+    ByteBufferUtils.readCompressedInt(block); // commonLength
+    ByteBuffer result = ByteBuffer.allocate(keyLength);
+
+    // copy row
+    int pos = result.arrayOffset();
+    block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
+    pos += Bytes.SIZEOF_SHORT;
+    short rowLength = result.getShort();
+    block.get(result.array(), pos, rowLength);
+    pos += rowLength;
+
+    // copy family
+    int savePosition = block.position();
+    block.position(Bytes.SIZEOF_INT);
+    block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
+    pos += familyLength + Bytes.SIZEOF_BYTE;
+
+    // copy qualifier
+    block.position(savePosition);
+    int qualifierLength =
+        keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
+    block.get(result.array(), pos, qualifierLength);
+    pos += qualifierLength;
+
+    // copy the timestamp and type
+    int timestampFitInBytes =
+        ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
+    long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
+    if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
+      timestamp = -timestamp;
+    }
+    result.putLong(pos, timestamp);
+    pos += Bytes.SIZEOF_LONG;
+    block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
+
+    block.reset();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return DiffKeyDeltaEncoder.class.getSimpleName();
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      final boolean includesMemstoreTS) {
+    return new BufferedEncodedSeeker(comparator) {
+      private static final int TIMESTAMP_WITH_TYPE_LENGTH =
+          Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
+      private byte[] familyNameWithSize;
+      private int rowLengthWithSize;
+      private long timestamp;
+
+      private void decode(boolean isFirst) {
+        byte flag = currentBuffer.get();
+        byte type = 0;
+        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          if (!isFirst) {
+            type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
+          }
+          current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+        }
+        if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+          current.valueLength =
+              ByteBufferUtils.readCompressedInt(currentBuffer);
+        }
+        current.lastCommonPrefix =
+            ByteBufferUtils.readCompressedInt(currentBuffer);
+
+        current.ensureSpaceForKey();
+
+        if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
+          // length of row is different, copy everything except family
+
+          // copy the row size
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
+          rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+              Bytes.SIZEOF_SHORT;
+
+          // copy the rest of row
+          currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
+              rowLengthWithSize - Bytes.SIZEOF_SHORT);
+
+          // copy the column family
+          System.arraycopy(familyNameWithSize, 0,
+              current.keyBuffer, rowLengthWithSize, familyNameWithSize.length);
+
+          // copy the qualifier
+          currentBuffer.get(current.keyBuffer,
+              rowLengthWithSize + familyNameWithSize.length,
+              current.keyLength - rowLengthWithSize -
+              familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
+        } else if (current.lastCommonPrefix < rowLengthWithSize) {
+          // we have to copy part of row and qualifier,
+          // but column family is in right place
+
+          // before column family (rest of row)
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              rowLengthWithSize - current.lastCommonPrefix);
+
+          // after column family (qualifier)
+          currentBuffer.get(current.keyBuffer,
+              rowLengthWithSize + familyNameWithSize.length,
+              current.keyLength - rowLengthWithSize -
+              familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
+        } else {
+          // copy just the ending
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
+              current.lastCommonPrefix);
+        }
+
+
+        // timestamp
+        int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
+        int timestampFitInBytes = 1 +
+            ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
+        long timestampOrDiff =
+            ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
+        if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
+          timestampOrDiff = -timestampOrDiff;
+        }
+        if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
+          timestamp = timestampOrDiff;
+        } else { // it is diff
+          timestamp = timestamp - timestampOrDiff;
+        }
+        Bytes.putLong(current.keyBuffer, pos, timestamp);
+        pos += Bytes.SIZEOF_LONG;
+
+        // type
+        if ((flag & FLAG_SAME_TYPE) == 0) {
+          currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
+        } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          current.keyBuffer[pos] = type;
+        }
+
+        current.valueOffset = currentBuffer.position();
+        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+
+        if (includesMemstoreTS) {
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+        } else {
+          current.memstoreTS = 0;
+        }
+        current.nextKvOffset = currentBuffer.position();
+      }
+
+      @Override
+      protected void decodeFirst() {
+        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+
+        // read column family
+        byte familyNameLength = currentBuffer.get();
+        familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
+        familyNameWithSize[0] = familyNameLength;
+        currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
+            familyNameLength);
+        decode(true);
+      }
+
+      @Override
+      protected void decodeNext() {
+        decode(false);
+      }
+    };
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * Encapsulates a data block compressed using a particular encoding algorithm.
+ * Useful for testing and benchmarking.
+ */
+public class EncodedDataBlock {
+  private static final int BUFFER_SIZE = 4 * 1024;
+  protected DataBlockEncoder dataBlockEncoder;
+  ByteArrayOutputStream uncompressedOutputStream;
+  ByteBuffer uncompressedBuffer;
+  private byte[] cacheCompressData;
+  private ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+  private boolean includesMemstoreTS;
+
+  /**
+   * Create a buffer which will be encoded using dataBlockEncoder.
+   * @param dataBlockEncoder Algorithm used for compression.
+   */
+  public EncodedDataBlock(DataBlockEncoder dataBlockEncoder,
+      boolean includesMemstoreTS) {
+    this.dataBlockEncoder = dataBlockEncoder;
+    uncompressedOutputStream = new ByteArrayOutputStream(BUFFER_SIZE);
+  }
+
+  /**
+   * Add KeyValue and compress it.
+   * @param kv Item to be added and compressed.
+   */
+  public void addKv(KeyValue kv) {
+    cacheCompressData = null;
+    uncompressedOutputStream.write(
+        kv.getBuffer(), kv.getOffset(), kv.getLength());
+  }
+
+  /**
+   * Provides access to compressed value.
+   * @return Forwards sequential iterator.
+   */
+  public Iterator<KeyValue> getIterator() {
+    final int uncompressedSize = uncompressedOutputStream.size();
+    final ByteArrayInputStream bais = new ByteArrayInputStream(
+        getCompressedData());
+    final DataInputStream dis = new DataInputStream(bais);
+
+
+    return new Iterator<KeyValue>() {
+      private ByteBuffer decompressedData = null;
+
+      @Override
+      public boolean hasNext() {
+        if (decompressedData == null) {
+          return uncompressedSize > 0;
+        }
+        return decompressedData.hasRemaining();
+      }
+
+      @Override
+      public KeyValue next() {
+        if (decompressedData == null) {
+          try {
+            decompressedData = dataBlockEncoder.uncompressKeyValues(
+                dis, includesMemstoreTS);
+          } catch (IOException e) {
+            throw new RuntimeException("Problem with data block encoder, " +
+                "most likely it requested more bytes than are available.", e);
+          }
+          decompressedData.rewind();
+        }
+
+        int offset = decompressedData.position();
+        KeyValue kv = new KeyValue(decompressedData.array(), offset);
+        decompressedData.position(offset + kv.getLength());
+
+        return kv;
+      }
+
+      @Override
+      public void remove() {
+        throw new NotImplementedException("remove() is not supported!");
+      }
+
+      @Override
+      public String toString() {
+        return "Iterator of: " + dataBlockEncoder.getClass().getName();
+      }
+
+    };
+  }
+
+  /**
+   * Find the size of minimal buffer that could store compressed data.
+   * @return Size in bytes of compressed data.
+   */
+  public int getSize() {
+    return getCompressedData().length;
+  }
+
+  /**
+   * Find the size of compressed data assuming that buffer will be compressed
+   * using given algorithm.
+   * @param compressor Algorithm used for compression.
+   * @param buffer Array to be compressed.
+   * @param offset Offset to beginning of the data.
+   * @param length Length to be compressed.
+   * @return Size of compressed data in bytes.
+   */
+  public static int checkCompressedSize(Compressor compressor, byte[] buffer,
+      int offset, int length) {
+    byte[] compressedBuffer = new byte[buffer.length];
+    // in fact the buffer could be of any positive size
+    compressor.setInput(buffer, offset, length);
+    compressor.finish();
+    int currentPos = 0;
+    while (!compressor.finished()) {
+      try {
+        // we don't care about compressed data,
+        // we just want to callculate number of bytes
+        currentPos += compressor.compress(compressedBuffer, 0,
+            compressedBuffer.length);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "For some reason compressor couldn't read data. " +
+            "It is likely a problem with " +
+            compressor.getClass().getName(), e);
+      }
+    }
+    return currentPos;
+  }
+
+  /**
+   * Estimate size after second stage of compression (e.g. LZO).
+   * @param compressor Algorithm which will be used for compressions.
+   * @return Size after second stage of compression.
+   */
+  public int checkCompressedSize(Compressor compressor) {
+    // compress
+    byte[] compressedBytes = getCompressedData();
+    return checkCompressedSize(compressor, compressedBytes, 0,
+        compressedBytes.length);
+  }
+
+  private byte[] getCompressedData() {
+    // is cached
+    if (cacheCompressData != null) {
+      return cacheCompressData;
+    }
+    cacheCompressData = doCompressData();
+
+    return cacheCompressData;
+  }
+
+  private ByteBuffer getUncompressedBuffer() {
+    if (uncompressedBuffer == null ||
+        uncompressedBuffer.limit() < uncompressedOutputStream.size()) {
+      uncompressedBuffer = ByteBuffer.wrap(
+          uncompressedOutputStream.toByteArray());
+    }
+    return uncompressedBuffer;
+  }
+
+  /**
+   * Do the compression.
+   * @return Compressed byte buffer.
+   */
+  public byte[] doCompressData() {
+    compressedStream.reset();
+    DataOutputStream dataOut = new DataOutputStream(compressedStream);
+    try {
+      this.dataBlockEncoder.compressKeyValues(
+          dataOut, getUncompressedBuffer(), includesMemstoreTS);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Bug in decoding part of algorithm %s. " +
+          "Probably it requested more bytes than are available.",
+          toString()), e);
+    }
+    return compressedStream.toByteArray();
+  }
+
+  @Override
+  public String toString() {
+    return dataBlockEncoder.toString();
+  }
+
+  /**
+   * Get uncompressed buffer.
+   * @return The buffer.
+   */
+  public byte[] getRawKeyValues() {
+    return uncompressedOutputStream.toByteArray();
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/EncoderBufferTooSmallException.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+/**
+ * Internal error which indicates a bug in a data block encoding algorithm.
+ */
+public class EncoderBufferTooSmallException extends RuntimeException {
+  private static final long serialVersionUID = 4767495176134878737L;
+
+  public EncoderBufferTooSmallException(String message) {
+    super(message);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
+ *
+ * Compress using:
+ * - store size of common prefix
+ * - save column family once in the first KeyValue
+ * - use integer compression for key, value and prefix (128-bit encoding)
+ * - use bits to avoid duplication key length, value length
+ *   and type if it same as previous
+ * - store in 3 bits length of prefix timestamp
+ *    with previous KeyValue's timestamp
+ * - one bit which allow to omit value if it is the same
+ *
+ * Format:
+ * - 1 byte:    flag
+ * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
+ * - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
+ * - 1-5 bytes: prefix length
+ * - ... bytes: rest of the row (if prefix length is small enough)
+ * - ... bytes: qualifier (or suffix depending on prefix length)
+ * - 1-8 bytes: timestamp suffix
+ * - 1 byte:    type (only if FLAG_SAME_TYPE is not set in the flag)
+ * - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
+ *
+ */
+public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
+  final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
+  final int SHIFT_TIMESTAMP_LENGTH = 0;
+  final int FLAG_SAME_KEY_LENGTH = 1 << 3;
+  final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
+  final int FLAG_SAME_TYPE = 1 << 5;
+  final int FLAG_SAME_VALUE = 1 << 6;
+
+  private static class FastDiffCompressionState extends CompressionState {
+    byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
+    int prevTimestampOffset;
+
+    @Override
+    protected void readTimestamp(ByteBuffer in) {
+      in.get(timestamp);
+    }
+
+    @Override
+    void copyFrom(CompressionState state) {
+      super.copyFrom(state);
+      FastDiffCompressionState state2 = (FastDiffCompressionState) state;
+      System.arraycopy(state2.timestamp, 0, timestamp, 0,
+          KeyValue.TIMESTAMP_SIZE);
+      prevTimestampOffset = state2.prevTimestampOffset;
+    }
+  }
+
+  private void compressSingleKeyValue(
+        FastDiffCompressionState previousState,
+        FastDiffCompressionState currentState,
+        OutputStream out, ByteBuffer in) throws IOException {
+    currentState.prevOffset = in.position();
+    int keyLength = in.getInt();
+    int valueOffset = currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
+    int valueLength = in.getInt();
+    byte flag = 0;
+
+    if (previousState.isFirst()) {
+      // copy the key, there is no common prefix with none
+      ByteBufferUtils.copyToStream(out, flag);
+      ByteBufferUtils.putCompressedInt(out, keyLength);
+      ByteBufferUtils.putCompressedInt(out, valueLength);
+      ByteBufferUtils.putCompressedInt(out, 0);
+
+      currentState.readKey(in, keyLength, valueLength);
+
+      ByteBufferUtils.copyToStream(out, in, keyLength + valueLength);
+    } else {
+      // find a common prefix and skip it
+      int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
+          previousState.prevOffset + KeyValue.ROW_OFFSET,
+          keyLength - KeyValue.TIMESTAMP_TYPE_SIZE);
+
+      currentState.readKey(in, keyLength, valueLength,
+          commonPrefix, previousState);
+
+      if (keyLength == previousState.keyLength) {
+        flag |= FLAG_SAME_KEY_LENGTH;
+      }
+      if (valueLength == previousState.valueLength) {
+        flag |= FLAG_SAME_VALUE_LENGTH;
+      }
+      if (currentState.type == previousState.type) {
+        flag |= FLAG_SAME_TYPE;
+      }
+
+      int prefixTimestamp = findCommonTimestampPrefix(
+          currentState, previousState);
+      flag |= (prefixTimestamp) << SHIFT_TIMESTAMP_LENGTH;
+
+      if (ByteBufferUtils.arePartsEqual(in,
+          previousState.prevOffset + previousState.keyLength + KeyValue.ROW_OFFSET,
+          previousState.valueLength, valueOffset, valueLength)) {
+        flag |= FLAG_SAME_VALUE;
+      }
+
+      ByteBufferUtils.copyToStream(out, flag);
+      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(out, keyLength);
+      }
+      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(out, valueLength);
+      }
+      ByteBufferUtils.putCompressedInt(out, commonPrefix);
+
+      ByteBufferUtils.skip(in, commonPrefix);
+      if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+        ByteBufferUtils.copyToStream(out, in,
+            currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+        ByteBufferUtils.skip(in, currentState.familyLength +
+            KeyValue.FAMILY_LENGTH_SIZE);
+        ByteBufferUtils.copyToStream(out, in, currentState.qualifierLength);
+      } else {
+        int restKeyLength = keyLength - commonPrefix -
+            KeyValue.TIMESTAMP_TYPE_SIZE;
+        ByteBufferUtils.copyToStream(out, in, restKeyLength);
+      }
+      ByteBufferUtils.skip(in, prefixTimestamp);
+      ByteBufferUtils.copyToStream(out, in,
+          KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
+
+      if ((flag & FLAG_SAME_TYPE) == 0) {
+        valueOffset -= KeyValue.TYPE_SIZE;
+        valueLength += KeyValue.TYPE_SIZE;
+      }
+
+      ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
+
+      if ((flag & FLAG_SAME_VALUE) == 0 ) {
+        ByteBufferUtils.copyToStream(out, in, valueOffset, valueLength);
+      } else {
+        if ((flag & FLAG_SAME_TYPE) == 0) {
+          ByteBufferUtils.copyToStream(out, currentState.type);
+        }
+      }
+    }
+  }
+
+  private int findCommonTimestampPrefix(FastDiffCompressionState left,
+      FastDiffCompressionState right) {
+    int prefixTimestamp = 0;
+    while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
+        left.timestamp[prefixTimestamp]
+            == right.timestamp[prefixTimestamp]) {
+      prefixTimestamp++;
+    }
+    return prefixTimestamp; // has to be at most 7 bytes
+  }
+
+  private void uncompressSingleKeyValue(DataInputStream source,
+      ByteBuffer buffer, FastDiffCompressionState state)
+          throws IOException, EncoderBufferTooSmallException {
+    byte flag = source.readByte();
+    int prevKeyLength = state.keyLength;
+
+    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+      state.keyLength = ByteBufferUtils.readCompressedInt(source);
+    }
+    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+      state.valueLength = ByteBufferUtils.readCompressedInt(source);
+    }
+    int commonLength = ByteBufferUtils.readCompressedInt(source);
+
+    ByteBufferUtils.ensureSpace(buffer, state.keyLength + state.valueLength +
+        KeyValue.ROW_OFFSET);
+
+    int kvPos = buffer.position();
+
+    if (!state.isFirst()) {
+      // copy the prefix
+      int common;
+      int prevOffset;
+
+      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+        buffer.putInt(state.keyLength);
+        buffer.putInt(state.valueLength);
+        prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
+        common = commonLength;
+      } else {
+        if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
+          prevOffset = state.prevOffset;
+          common = commonLength + KeyValue.ROW_OFFSET;
+        } else {
+          buffer.putInt(state.keyLength);
+          prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
+          common = commonLength + KeyValue.KEY_LENGTH_SIZE;
+        }
+      }
+
+      ByteBufferUtils.copyFromBuffer(buffer, buffer, prevOffset, common);
+
+      // copy the rest of the key from the buffer
+      int keyRestLength;
+      if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+        // omit the family part of the key, it is always the same
+        int rowWithSizeLength;
+        int rowRestLength;
+
+        // check length of row
+        if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
+          // not yet copied, do it now
+          ByteBufferUtils.copyFromStream(source, buffer,
+              KeyValue.ROW_LENGTH_SIZE - commonLength);
+
+          rowWithSizeLength = buffer.getShort(buffer.position() -
+              KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
+          rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
+        } else {
+          // already in kvBuffer, just read it
+          rowWithSizeLength = buffer.getShort(kvPos + KeyValue.ROW_OFFSET) +
+              KeyValue.ROW_LENGTH_SIZE;
+          rowRestLength = rowWithSizeLength - commonLength;
+        }
+
+        // copy the rest of row
+        ByteBufferUtils.copyFromStream(source, buffer, rowRestLength);
+
+        // copy the column family
+        ByteBufferUtils.copyFromBuffer(buffer, buffer,
+            state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
+                + state.rowLength, state.familyLength
+                + KeyValue.FAMILY_LENGTH_SIZE);
+        state.rowLength = (short) (rowWithSizeLength -
+            KeyValue.ROW_LENGTH_SIZE);
+
+        keyRestLength = state.keyLength - rowWithSizeLength -
+            state.familyLength -
+            (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
+      } else {
+        // prevRowWithSizeLength is the same as on previous row
+        keyRestLength = state.keyLength - commonLength -
+            KeyValue.TIMESTAMP_TYPE_SIZE;
+      }
+      // copy the rest of the key, after column family == column qualifier
+      ByteBufferUtils.copyFromStream(source, buffer, keyRestLength);
+
+      // copy timestamp
+      int prefixTimestamp =
+          (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
+      ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevTimestampOffset,
+          prefixTimestamp);
+      state.prevTimestampOffset = buffer.position() - prefixTimestamp;
+      ByteBufferUtils.copyFromStream(source, buffer, KeyValue.TIMESTAMP_SIZE
+          - prefixTimestamp);
+
+      // copy the type and value
+      if ((flag & FLAG_SAME_TYPE) != 0) {
+        buffer.put(state.type);
+        if ((flag & FLAG_SAME_VALUE) != 0) {
+          ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset +
+              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
+        } else {
+          ByteBufferUtils.copyFromStream(source, buffer, state.valueLength);
+        }
+      } else {
+        if ((flag & FLAG_SAME_VALUE) != 0) {
+          ByteBufferUtils.copyFromStream(source, buffer, KeyValue.TYPE_SIZE);
+          ByteBufferUtils.copyFromBuffer(buffer, buffer, state.prevOffset +
+              KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
+        } else {
+          ByteBufferUtils.copyFromStream(source, buffer,
+              state.valueLength + KeyValue.TYPE_SIZE);
+        }
+        state.type = buffer.get(state.prevTimestampOffset +
+            KeyValue.TIMESTAMP_SIZE);
+      }
+    } else { // is first element
+      buffer.putInt(state.keyLength);
+      buffer.putInt(state.valueLength);
+
+      state.prevTimestampOffset = buffer.position() + state.keyLength -
+          KeyValue.TIMESTAMP_TYPE_SIZE;
+      ByteBufferUtils.copyFromStream(source, buffer, state.keyLength
+          + state.valueLength);
+      state.rowLength = buffer.getShort(kvPos + KeyValue.ROW_OFFSET);
+      state.familyLength = buffer.get(kvPos + KeyValue.ROW_OFFSET +
+          KeyValue.ROW_LENGTH_SIZE + state.rowLength);
+      state.type = buffer.get(state.prevTimestampOffset +
+          KeyValue.TIMESTAMP_SIZE);
+    }
+
+    state.prevOffset = kvPos;
+  }
+
+  @Override
+  public void compressKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+    in.rewind();
+    ByteBufferUtils.putInt(out, in.limit());
+    FastDiffCompressionState previousState = new FastDiffCompressionState();
+    FastDiffCompressionState currentState = new FastDiffCompressionState();
+    while (in.hasRemaining()) {
+      compressSingleKeyValue(previousState, currentState,
+          out, in);
+      afterEncodingKeyValue(in, out, includesMemstoreTS);
+
+      // swap previousState <-> currentState
+      FastDiffCompressionState tmp = previousState;
+      previousState = currentState;
+      currentState = tmp;
+    }
+  }
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+          throws IOException {
+    int decompressedSize = source.readInt();
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+        allocHeaderLength);
+    buffer.position(allocHeaderLength);
+    FastDiffCompressionState state = new FastDiffCompressionState();
+    while (source.available() > skipLastBytes) {
+      uncompressSingleKeyValue(source, buffer, state);
+      afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+    }
+
+    if (source.available() != skipLastBytes) {
+      throw new IllegalStateException("Read too much bytes.");
+    }
+
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    block.mark();
+    block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
+    int keyLength = ByteBufferUtils.readCompressedInt(block);
+    ByteBufferUtils.readCompressedInt(block); // valueLength
+    ByteBufferUtils.readCompressedInt(block); // commonLength
+    int pos = block.position();
+    block.reset();
+    return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
+  }
+
+  @Override
+  public String toString() {
+    return FastDiffDeltaEncoder.class.getSimpleName();
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      final boolean includesMemstoreTS) {
+    return new BufferedEncodedSeeker(comparator) {
+      private byte[] prevTimestampAndType = new byte[
+          Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE];
+      private int rowLengthWithSize;
+      private int columnFamilyLengthWithSize;
+
+      private void decode(boolean isFirst) {
+        byte flag = currentBuffer.get();
+        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          if (!isFirst) {
+            System.arraycopy(current.keyBuffer,
+                current.keyLength - prevTimestampAndType.length,
+                prevTimestampAndType, 0,
+                prevTimestampAndType.length);
+          }
+          current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+        }
+        if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+          current.valueLength =
+              ByteBufferUtils.readCompressedInt(currentBuffer);
+        }
+        current.lastCommonPrefix =
+            ByteBufferUtils.readCompressedInt(currentBuffer);
+
+        current.ensureSpaceForKey();
+
+        if (isFirst) {
+          // copy everything
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              current.keyLength - prevTimestampAndType.length);
+          rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+              Bytes.SIZEOF_SHORT;
+          columnFamilyLengthWithSize = current.keyBuffer[rowLengthWithSize] +
+              Bytes.SIZEOF_BYTE;
+        } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
+          // length of row is different, copy everything except family
+
+          // copy the row size
+          int oldRowLengthWithSize = rowLengthWithSize;
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
+          rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
+              Bytes.SIZEOF_SHORT;
+
+          // move the column family
+          System.arraycopy(current.keyBuffer, oldRowLengthWithSize,
+              current.keyBuffer, rowLengthWithSize,
+              columnFamilyLengthWithSize);
+
+          // copy the rest of row
+          currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
+              rowLengthWithSize - Bytes.SIZEOF_SHORT);
+
+          // copy the qualifier
+          currentBuffer.get(current.keyBuffer,
+              rowLengthWithSize + columnFamilyLengthWithSize,
+              current.keyLength - rowLengthWithSize -
+              columnFamilyLengthWithSize - prevTimestampAndType.length);
+        } else if (current.lastCommonPrefix < rowLengthWithSize) {
+          // we have to copy part of row and qualifier,
+          // but column family is in right place
+
+          // before column family (rest of row)
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              rowLengthWithSize - current.lastCommonPrefix);
+
+          // after column family (qualifier)
+          currentBuffer.get(current.keyBuffer,
+              rowLengthWithSize + columnFamilyLengthWithSize,
+              current.keyLength - rowLengthWithSize -
+              columnFamilyLengthWithSize - prevTimestampAndType.length);
+        } else {
+          // copy just the ending
+          currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+              current.keyLength - prevTimestampAndType.length -
+              current.lastCommonPrefix);
+        }
+
+        // timestamp
+        int pos = current.keyLength - prevTimestampAndType.length;
+        int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>>
+          SHIFT_TIMESTAMP_LENGTH;
+        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          System.arraycopy(prevTimestampAndType, 0, current.keyBuffer,
+              pos, commonTimestampPrefix);
+        }
+        pos += commonTimestampPrefix;
+        currentBuffer.get(current.keyBuffer, pos,
+            Bytes.SIZEOF_LONG - commonTimestampPrefix);
+        pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
+
+        // type
+        if ((flag & FLAG_SAME_TYPE) == 0) {
+          currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
+        } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          current.keyBuffer[pos] = prevTimestampAndType[Bytes.SIZEOF_LONG];
+        }
+
+        // handle value
+        if ((flag & FLAG_SAME_VALUE) == 0) {
+          current.valueOffset = currentBuffer.position();
+          ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        }
+
+        if (includesMemstoreTS) {
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+        } else {
+          current.memstoreTS = 0;
+        }
+        current.nextKvOffset = currentBuffer.position();
+      }
+
+      @Override
+      protected void decodeFirst() {
+        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        decode(true);
+      }
+
+      @Override
+      protected void decodeNext() {
+        decode(false);
+      }
+    };
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java?rev=1223020&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java Sat Dec 24 21:20:39 2011
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Compress key by storing size of common prefix with previous KeyValue
+ * and storing raw size of rest.
+ *
+ * Format:
+ * 1-5 bytes: compressed key length minus prefix (7-bit encoding)
+ * 1-5 bytes: compressed value length (7-bit encoding)
+ * 1-3 bytes: compressed length of common key prefix
+ * ... bytes: rest of key (including timestamp)
+ * ... bytes: value
+ *
+ * In a worst case compressed KeyValue will be three bytes longer than original.
+ *
+ */
+public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
+
+  private int addKv(int offset, DataOutputStream out,
+      ByteBuffer in, int prevKeyLength) throws IOException {
+    int keyLength = in.getInt();
+    int valueLength = in.getInt();
+
+    if (offset == -1) {
+      // copy the key, there is no common prefix with none
+      ByteBufferUtils.putCompressedInt(out, keyLength);
+      ByteBufferUtils.putCompressedInt(out, valueLength);
+      ByteBufferUtils.putCompressedInt(out, 0);
+      ByteBufferUtils.copyToStream(out, in, keyLength + valueLength);
+    } else {
+      // find a common prefix and skip it
+      int common = ByteBufferUtils.findCommonPrefix(
+          in, offset + KeyValue.ROW_OFFSET,
+          in.position(),
+          Math.min(prevKeyLength, keyLength));
+
+      ByteBufferUtils.putCompressedInt(out, keyLength - common);
+      ByteBufferUtils.putCompressedInt(out, valueLength);
+      ByteBufferUtils.putCompressedInt(out, common);
+
+      ByteBufferUtils.skip(in, common);
+      ByteBufferUtils.copyToStream(out, in, keyLength - common + valueLength);
+    }
+
+    return keyLength;
+  }
+
+  @Override
+  public void compressKeyValues(DataOutputStream writeHere,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+    in.rewind();
+    ByteBufferUtils.putInt(writeHere, in.limit());
+    int prevOffset = -1;
+    int offset = 0;
+    int keyLength = 0;
+    while (in.hasRemaining()) {
+      offset = in.position();
+      keyLength = addKv(prevOffset, writeHere, in, keyLength);
+      afterEncodingKeyValue(in, writeHere, includesMemstoreTS);
+      prevOffset = offset;
+    }
+  }
+
+  @Override
+  public ByteBuffer uncompressKeyValues(DataInputStream source,
+      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+          throws IOException {
+    int decompressedSize = source.readInt();
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+        allocHeaderLength);
+    buffer.position(allocHeaderLength);
+    int prevKeyOffset = 0;
+
+    while (source.available() > skipLastBytes) {
+      prevKeyOffset = uncompressKeyValue(source, buffer, prevKeyOffset);
+      afterDecodingKeyValue(source, buffer, includesMemstoreTS);
+    }
+
+    if (source.available() != skipLastBytes) {
+      throw new IllegalStateException("Read too many bytes.");
+    }
+
+    buffer.limit(buffer.position());
+    return buffer;
+  }
+
+  private int uncompressKeyValue(DataInputStream source, ByteBuffer buffer,
+      int prevKeyOffset)
+          throws IOException, EncoderBufferTooSmallException {
+    int keyLength = ByteBufferUtils.readCompressedInt(source);
+    int valueLength = ByteBufferUtils.readCompressedInt(source);
+    int commonLength = ByteBufferUtils.readCompressedInt(source);
+    int keyOffset;
+    keyLength += commonLength;
+
+    ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
+        + KeyValue.ROW_OFFSET);
+
+    buffer.putInt(keyLength);
+    buffer.putInt(valueLength);
+
+    // copy the prefix
+    if (commonLength > 0) {
+      keyOffset = buffer.position();
+      ByteBufferUtils.copyFromBuffer(buffer, buffer, prevKeyOffset,
+          commonLength);
+    } else {
+      keyOffset = buffer.position();
+    }
+
+    // copy rest of the key and value
+    int len = keyLength - commonLength + valueLength;
+    ByteBufferUtils.copyFromStream(source, buffer, len);
+    return keyOffset;
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    block.mark();
+    block.position(Bytes.SIZEOF_INT);
+    int keyLength = ByteBufferUtils.readCompressedInt(block);
+    ByteBufferUtils.readCompressedInt(block);
+    int commonLength = ByteBufferUtils.readCompressedInt(block);
+    if (commonLength != 0) {
+      throw new AssertionError("Nonzero common length in the first key in "
+          + "block: " + commonLength);
+    }
+    int pos = block.position();
+    block.reset();
+    return ByteBuffer.wrap(block.array(), pos, keyLength).slice();
+  }
+
+  @Override
+  public String toString() {
+    return PrefixKeyDeltaEncoder.class.getSimpleName();
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
+      final boolean includesMemstoreTS) {
+    return new BufferedEncodedSeeker(comparator) {
+      @Override
+      protected void decodeNext() {
+        current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.lastCommonPrefix =
+            ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.keyLength += current.lastCommonPrefix;
+        current.ensureSpaceForKey();
+        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
+            current.keyLength - current.lastCommonPrefix);
+        current.valueOffset = currentBuffer.position();
+        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        if (includesMemstoreTS) {
+          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+        } else {
+          current.memstoreTS = 0;
+        }
+        current.nextKvOffset = currentBuffer.position();
+      }
+
+      @Override
+      protected void decodeFirst() {
+        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        decodeNext();
+      }
+    };
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Sat Dec 24 21:20:39 2011
@@ -1,4 +1,4 @@
-/*
+  /*
  * Copyright 2011 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.io.RawComparator;
 
@@ -60,6 +59,12 @@ public abstract class AbstractHFileReade
   /** Filled when we read in the trailer. */
   protected final Compression.Algorithm compressAlgo;
 
+  /**
+   * What kind of data block encoding should be used while reading, writing and
+   * handling cache.
+   */
+  protected final HFileDataBlockEncoder blockEncoder;
+
   /** Last key in the file. Filled in when we read in the file info */
   protected byte [] lastKey = null;
 
@@ -93,7 +98,8 @@ public abstract class AbstractHFileReade
   protected AbstractHFileReader(Path path, FixedFileTrailer trailer,
       final FSDataInputStream fsdis, final long fileSize,
       final boolean closeIStream,
-      final CacheConfig cacheConf) {
+      final CacheConfig cacheConf,
+      final HFileDataBlockEncoder dataBlockEncoder) {
     super(null, path);
     this.trailer = trailer;
     this.compressAlgo = trailer.getCompressionCodec();
@@ -101,6 +107,8 @@ public abstract class AbstractHFileReade
     this.fileSize = fileSize;
     this.istream = fsdis;
     this.closeIStream = closeIStream;
+    this.blockEncoder = dataBlockEncoder != null
+        ? dataBlockEncoder : new NoOpDataBlockEncoder();
     this.path = path;
     this.name = path.getName();
   }
@@ -275,8 +283,11 @@ public abstract class AbstractHFileReade
 
     protected int blockFetches;
 
-    public Scanner(final boolean cacheBlocks,
+    protected final HFile.Reader reader;
+
+    public Scanner(final HFile.Reader reader, final boolean cacheBlocks,
         final boolean pread, final boolean isCompaction) {
+      this.reader = reader;
       this.cacheBlocks = cacheBlocks;
       this.pread = pread;
       this.isCompaction = isCompaction;
@@ -296,6 +307,26 @@ public abstract class AbstractHFileReade
       if (!isSeeked())
         throw new NotSeekedException();
     }
+
+    @Override
+    public int seekTo(byte[] key) throws IOException {
+      return seekTo(key, 0, key.length);
+    }
+    
+    @Override
+    public boolean seekBefore(byte[] key) throws IOException {
+      return seekBefore(key, 0, key.length);
+    }
+    
+    @Override
+    public int reseekTo(byte[] key) throws IOException {
+      return reseekTo(key, 0, key.length);
+    }
+
+    @Override
+    public HFile.Reader getReader() {
+      return reader;
+    }
   }
 
   /** For testing */
@@ -306,5 +337,4 @@ public abstract class AbstractHFileReade
   public Path getPath() {
     return path;
   }
-
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java Sat Dec 24 21:20:39 2011
@@ -83,6 +83,9 @@ public abstract class AbstractHFileWrite
 
   /** The compression algorithm used. NONE if no compression. */
   protected final Compression.Algorithm compressAlgo;
+  
+  /** The data block encoding which will be used. NONE if there is no encoding */
+  protected final HFileDataBlockEncoder blockEncoder;
 
   /** First key in a block. */
   protected byte[] firstKeyInBlock = null;
@@ -102,7 +105,9 @@ public abstract class AbstractHFileWrite
 
   public AbstractHFileWriter(CacheConfig cacheConf,
       FSDataOutputStream outputStream, Path path, int blockSize,
-      Compression.Algorithm compressAlgo, KeyComparator comparator) {
+      Compression.Algorithm compressAlgo,
+      HFileDataBlockEncoder dataBlockEncoder,
+      KeyComparator comparator) {
     super(null, path);
     this.outputStream = outputStream;
     this.path = path;
@@ -110,6 +115,8 @@ public abstract class AbstractHFileWrite
     this.blockSize = blockSize;
     this.compressAlgo = compressAlgo == null
         ? HFile.DEFAULT_COMPRESSION_ALGORITHM : compressAlgo;
+    this.blockEncoder = dataBlockEncoder != null
+        ? dataBlockEncoder : new NoOpDataBlockEncoder();
     this.comparator = comparator != null ? comparator
         : Bytes.BYTES_RAWCOMPARATOR;
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java Sat Dec 24 21:20:39 2011
@@ -39,6 +39,14 @@ public enum BlockType {
   /** Data block, both versions */
   DATA("DATABLK*", BlockCategory.DATA),
 
+  /** An encoded data block (e.g. with prefix compression), version 2 */
+  ENCODED_DATA("DATABLKE", BlockCategory.DATA) {
+    @Override
+    public int getId() {
+      return DATA.ordinal();
+    }
+  },
+
   /** Version 2 leaf index block. Appears in the data block section */
   LEAF_INDEX("IDXLEAF2", BlockCategory.INDEX),
 
@@ -102,6 +110,15 @@ public enum BlockType {
     this.metricCat = metricCat;
     assert magic.length == MAGIC_LENGTH;
   }
+  
+  /**
+   * Use it instead of oridinal(). It works exactly the same,
+   * except DATA and ENCODED_DATA got same id.
+   * @return id between 0 and N
+   */
+  public int getId() {
+    return ordinal();
+  }
 
   public void writeToStream(OutputStream out) throws IOException {
     out.write(magic);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1223020&r1=1223019&r2=1223020&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Sat Dec 24 21:20:39 2011
@@ -243,6 +243,7 @@ public class HFile {
 
     public abstract Writer createWriter(FileSystem fs, Path path,
         int blockSize, Compression.Algorithm compress,
+        HFileDataBlockEncoder dataBlockEncoder,
         final KeyComparator comparator) throws IOException;
 
     public abstract Writer createWriter(FileSystem fs, Path path,
@@ -371,32 +372,47 @@ public class HFile {
   }
 
   private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,
-      long size, boolean closeIStream, CacheConfig cacheConf)
+      long size, boolean closeIStream, CacheConfig cacheConf,
+      HFileDataBlockEncoder dataBlockEncoder)
   throws IOException {
     FixedFileTrailer trailer = FixedFileTrailer.readFromStream(fsdis, size);
     switch (trailer.getVersion()) {
     case 1:
       return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
-          cacheConf);
+          cacheConf, dataBlockEncoder);
     case 2:
       return new HFileReaderV2(path, trailer, fsdis, size, closeIStream,
-          cacheConf);
+          cacheConf, dataBlockEncoder);
     default:
       throw new IOException("Cannot instantiate reader for HFile version " +
           trailer.getVersion());
     }
   }
 
+  public static Reader createReader(
+      FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
+    return createReader(fs, path, cacheConf,
+        new NoOpDataBlockEncoder());
+  }
+
+  public static Reader createReader(Path path, FSDataInputStream fsdis,
+      long size, CacheConfig cacheConf) throws IOException {
+    return createReader(path, fsdis, size, cacheConf,
+        new NoOpDataBlockEncoder());
+  }
+
   public static Reader createReader(FileSystem fs, Path path,
-      CacheConfig cacheConf) throws IOException {
+      CacheConfig cacheConf, HFileDataBlockEncoder dataBlockEncoder)
+      throws IOException {
     return pickReaderVersion(path, fs.open(path),
-        fs.getFileStatus(path).getLen(), true, cacheConf);
+        fs.getFileStatus(path).getLen(), true, cacheConf, dataBlockEncoder);
   }
 
   public static Reader createReader(Path path, FSDataInputStream fsdis,
-      long size, CacheConfig cacheConf)
-      throws IOException {
-    return pickReaderVersion(path, fsdis, size, false, cacheConf);
+      long size, CacheConfig cacheConf,
+      HFileDataBlockEncoder dataBlockEncoder) throws IOException {
+    return pickReaderVersion(path, fsdis, size, false, cacheConf,
+        dataBlockEncoder);
   }
 
   /*