You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/05/22 08:31:20 UTC

[2/2] git commit: HBASE-10835 DBE encode path improvements.(Anoop)

HBASE-10835 DBE encode path improvements.(Anoop)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53513dcb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53513dcb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53513dcb

Branch: refs/heads/master
Commit: 53513dcb452e104bbfd71819054bf4d68808f731
Parents: cb1428d
Author: anoopsjohn <an...@gmail.com>
Authored: Thu May 22 11:59:52 2014 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Thu May 22 11:59:52 2014 +0530

----------------------------------------------------------------------
 .../io/encoding/BufferedDataBlockEncoder.java   | 117 +++++----
 .../io/encoding/CopyKeyDataBlockEncoder.java    |  33 ++-
 .../hbase/io/encoding/DataBlockEncoder.java     |  44 ++--
 .../hbase/io/encoding/DiffKeyDeltaEncoder.java  | 242 ++++++++-----------
 .../hbase/io/encoding/EncodedDataBlock.java     |  47 +++-
 .../hadoop/hbase/io/encoding/EncodingState.java |  34 +++
 .../hbase/io/encoding/FastDiffDeltaEncoder.java | 225 ++++++++---------
 .../HFileBlockDefaultEncodingContext.java       |  68 ++----
 .../io/encoding/HFileBlockEncodingContext.java  |  41 ++--
 .../io/encoding/PrefixKeyDeltaEncoder.java      |  59 ++---
 .../hbase/codec/prefixtree/PrefixTreeCodec.java |  96 ++++----
 .../codec/prefixtree/encode/EncoderFactory.java |   2 +-
 .../prefixtree/encode/EncoderPoolImpl.java      |  47 ++++
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  78 +++---
 .../hbase/io/hfile/HFileDataBlockEncoder.java   |  43 +++-
 .../io/hfile/HFileDataBlockEncoderImpl.java     |  58 ++---
 .../hadoop/hbase/io/hfile/HFileWriterV2.java    |  83 +++----
 .../hadoop/hbase/io/hfile/HFileWriterV3.java    |  88 ++-----
 .../hbase/io/hfile/NoOpDataBlockEncoder.java    |  47 +++-
 .../io/encoding/TestDataBlockEncoders.java      | 175 +++++++-------
 .../io/encoding/TestPrefixTreeEncoding.java     |  78 +++---
 .../encoding/TestSeekToBlockWithEncoders.java   |  68 ++----
 .../hadoop/hbase/io/hfile/TestHFileBlock.java   | 119 +++------
 .../io/hfile/TestHFileBlockCompatibility.java   | 126 +++++-----
 .../io/hfile/TestHFileDataBlockEncoder.java     |  74 +++---
 25 files changed, 1019 insertions(+), 1073 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index d79aff2..af78c1c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -474,10 +474,18 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
     abstract protected void decodeNext();
   }
 
-  protected final void afterEncodingKeyValue(ByteBuffer in,
-      DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
+  /**
+   * @param kv
+   * @param out
+   * @param encodingCtx
+   * @return unencoded size added
+   * @throws IOException
+   */
+  protected final int afterEncodingKeyValue(KeyValue kv, DataOutputStream out,
+      HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
+    int size = 0;
     if (encodingCtx.getHFileContext().isIncludesTags()) {
-      short tagsLength = in.getShort();
+      short tagsLength = kv.getTagsLength();
       ByteBufferUtils.putCompressedInt(out, tagsLength);
       // There are some tags to be written
       if (tagsLength > 0) {
@@ -485,23 +493,23 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
         // When tag compression is enabled, tagCompressionContext will have a not null value. Write
         // the tags using Dictionary compression in such a case
         if (tagCompressionContext != null) {
-          tagCompressionContext.compressTags(out, in, tagsLength);
+          tagCompressionContext
+              .compressTags(out, kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
         } else {
-          ByteBufferUtils.moveBufferToStream(out, in, tagsLength);
+          out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
         }
       }
+      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
     }
     if (encodingCtx.getHFileContext().isIncludesMvcc()) {
       // Copy memstore timestamp from the byte buffer to the output stream.
-      long memstoreTS = -1;
-      try {
-        memstoreTS = ByteBufferUtils.readVLong(in);
-        WritableUtils.writeVLong(out, memstoreTS);
-      } catch (IOException ex) {
-        throw new RuntimeException("Unable to copy memstore timestamp " +
-            memstoreTS + " after encoding a key/value");
-      }
+      long memstoreTS = kv.getMvccVersion();
+      WritableUtils.writeVLong(out, memstoreTS);
+      // TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
+      // avoided.
+      size += WritableUtils.getVIntSize(memstoreTS);
     }
+    return size;
   }
 
   protected final void afterDecodingKeyValue(DataInputStream source,
@@ -545,23 +553,30 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
     return new HFileBlockDefaultDecodingContext(meta);
   }
 
-  /**
-   * Compress KeyValues and write them to output buffer.
-   * @param out Where to write compressed data.
-   * @param in Source of KeyValue for compression.
-   * @param encodingCtx use the Encoding ctx associated with the current block
-   * @throws IOException If there is an error writing to output stream.
-   */
-  public abstract void internalEncodeKeyValues(DataOutputStream out,
-      ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException;
-
   protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
       int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
       throws IOException;
 
+  /**
+   * Asserts that there is at least the given amount of unfilled space
+   * remaining in the given buffer.
+   * @param out typically, the buffer we are writing to
+   * @param length the required space in the buffer
+   * @throws EncoderBufferTooSmallException If there are no enough bytes.
+   */
+  protected static void ensureSpace(ByteBuffer out, int length)
+      throws EncoderBufferTooSmallException {
+    if (out.position() + length > out.limit()) {
+      throw new EncoderBufferTooSmallException(
+          "Buffer position=" + out.position() +
+          ", buffer limit=" + out.limit() +
+          ", length to be written=" + length);
+    }
+  }
+
   @Override
-  public void encodeKeyValues(ByteBuffer in,
-      HFileBlockEncodingContext blkEncodingCtx) throws IOException {
+  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
+      throws IOException {
     if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
       throw new IOException (this.getClass().getName() + " only accepts "
           + HFileBlockDefaultEncodingContext.class.getName() + " as the " +
@@ -570,8 +585,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
 
     HFileBlockDefaultEncodingContext encodingCtx =
         (HFileBlockDefaultEncodingContext) blkEncodingCtx;
-    encodingCtx.prepareEncoding();
-    DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
+    encodingCtx.prepareEncoding(out);
     if (encodingCtx.getHFileContext().isIncludesTags()
         && encodingCtx.getHFileContext().isCompressTags()) {
       if (encodingCtx.getTagCompressionContext() != null) {
@@ -588,29 +602,40 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
         }
       }
     }
-    internalEncodeKeyValues(dataOut, in, encodingCtx);
+    ByteBufferUtils.putInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
+    blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
+  }
+
+  private static class BufferedDataBlockEncodingState extends EncodingState {
+    int unencodedDataSizeWritten = 0;
+  }
+
+  @Override
+  public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException {
+    BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
+        .getEncodingState();
+    int encodedKvSize = internalEncode(kv, (HFileBlockDefaultEncodingContext) encodingCtx, out);
+    state.unencodedDataSizeWritten += encodedKvSize;
+    return encodedKvSize;
+  }
+
+  public abstract int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingCtx,
+      DataOutputStream out) throws IOException;
+
+  @Override
+  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+      byte[] uncompressedBytesWithHeader) throws IOException {
+    BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
+        .getEncodingState();
+    // Write the unencodedDataSizeWritten (with header size)
+    Bytes.putInt(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE
+        + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
+        );
     if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
       encodingCtx.postEncoding(BlockType.ENCODED_DATA);
     } else {
       encodingCtx.postEncoding(BlockType.DATA);
     }
   }
-
-  /**
-   * Asserts that there is at least the given amount of unfilled space
-   * remaining in the given buffer.
-   * @param out typically, the buffer we are writing to
-   * @param length the required space in the buffer
-   * @throws EncoderBufferTooSmallException If there are no enough bytes.
-   */
-  protected static void ensureSpace(ByteBuffer out, int length)
-      throws EncoderBufferTooSmallException {
-    if (out.position() + length > out.limit()) {
-      throw new EncoderBufferTooSmallException(
-          "Buffer position=" + out.position() +
-          ", buffer limit=" + out.limit() +
-          ", length to be written=" + length);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 1dc8413..c1c9956 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Just copy data, do not do any kind of compression. Use for comparison and
@@ -32,14 +34,33 @@ import org.apache.hadoop.hbase.util.Bytes;
  */
 @InterfaceAudience.Private
 public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
+
   @Override
-  public void internalEncodeKeyValues(DataOutputStream out,
-      ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
-    in.rewind();
-    ByteBufferUtils.putInt(out, in.limit());
-    ByteBufferUtils.moveBufferToStream(out, in, in.limit());
-  }
+  public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+      DataOutputStream out) throws IOException {
+    int klength = kv.getKeyLength();
+    int vlength = kv.getValueLength();
 
+    out.writeInt(klength);
+    out.writeInt(vlength);
+    out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
+    out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
+    int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+    // Write the additional tag into the stream
+    if (encodingContext.getHFileContext().isIncludesTags()) {
+      short tagsLength = kv.getTagsLength();
+      out.writeShort(tagsLength);
+      if (tagsLength > 0) {
+        out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
+      }
+      size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
+    }
+    if (encodingContext.getHFileContext().isIncludesMvcc()) {
+      WritableUtils.writeVLong(out, kv.getMvccVersion());
+      size += WritableUtils.getVIntSize(kv.getMvccVersion());
+    }
+    return size;
+  }
 
   @Override
   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index ddb2359..99f6a7f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -17,11 +17,13 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 
@@ -34,28 +36,42 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
  * <li>knowledge of Key Value format</li>
  * </ul>
  * It is designed to work fast enough to be feasible as in memory compression.
- *
- * After encoding, it also optionally compresses the encoded data if a
- * compression algorithm is specified in HFileBlockEncodingContext argument of
- * {@link #encodeKeyValues(ByteBuffer, HFileBlockEncodingContext)}.
  */
 @InterfaceAudience.Private
 public interface DataBlockEncoder {
 
   /**
-   * Encodes KeyValues. It will first encode key value pairs, and then
-   * optionally do the compression for the encoded data.
-   *
-   * @param in
-   *          Source of KeyValue for compression.
+   * Starts encoding for a block of KeyValues. Call
+   * {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish
+   * encoding of a block.
+   * @param encodingCtx
+   * @param out
+   * @throws IOException
+   */
+  void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException;
+
+  /**
+   * Encodes a KeyValue.
+   * @param kv
+   * @param encodingCtx
+   * @param out
+   * @return unencoded kv size written
+   * @throws IOException
+   */
+  int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException;
+
+  /**
+   * Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
+   * stuff for the encoded block. It must be called at the end of block encoding.
    * @param encodingCtx
-   *          the encoding context which will contain encoded uncompressed bytes
-   *          as well as compressed encoded bytes if compression is enabled, and
-   *          also it will reuse resources across multiple calls.
+   * @param out
+   * @param uncompressedBytesWithHeader
    * @throws IOException
-   *           If there is an error writing to output stream.
    */
-  void encodeKeyValues(ByteBuffer in, HFileBlockEncodingContext encodingCtx) throws IOException;
+  void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+      byte[] uncompressedBytesWithHeader) throws IOException;
 
   /**
    * Decode.

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index f72878b..fc4c314 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -75,130 +75,6 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
     }
   }
 
-  private void compressSingleKeyValue(DiffCompressionState previousState,
-      DiffCompressionState currentState, DataOutputStream out,
-      ByteBuffer in) throws IOException {
-    byte flag = 0;
-    int kvPos = in.position();
-    int keyLength = in.getInt();
-    int valueLength = in.getInt();
-
-    long timestamp;
-    long diffTimestamp = 0;
-    int diffTimestampFitsInBytes = 0;
-
-    int commonPrefix;
-
-    int timestampFitsInBytes;
-
-    if (previousState.isFirst()) {
-      currentState.readKey(in, keyLength, valueLength);
-      currentState.prevOffset = kvPos;
-      timestamp = currentState.timestamp;
-      if (timestamp < 0) {
-        flag |= FLAG_TIMESTAMP_SIGN;
-        timestamp = -timestamp;
-      }
-      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
-      flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
-      commonPrefix = 0;
-
-      // put column family
-      in.mark();
-      ByteBufferUtils.skip(in, currentState.rowLength
-          + KeyValue.ROW_LENGTH_SIZE);
-      ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
-          + KeyValue.FAMILY_LENGTH_SIZE);
-      in.reset();
-    } else {
-      // find a common prefix and skip it
-      commonPrefix =
-          ByteBufferUtils.findCommonPrefix(in, in.position(),
-              previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
-                  - KeyValue.TIMESTAMP_TYPE_SIZE);
-      // don't compress timestamp and type using prefix
-
-      currentState.readKey(in, keyLength, valueLength,
-          commonPrefix, previousState);
-      currentState.prevOffset = kvPos;
-      timestamp = currentState.timestamp;
-      boolean negativeTimestamp = timestamp < 0;
-      if (negativeTimestamp) {
-        timestamp = -timestamp;
-      }
-      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
-      if (keyLength == previousState.keyLength) {
-        flag |= FLAG_SAME_KEY_LENGTH;
-      }
-      if (valueLength == previousState.valueLength) {
-        flag |= FLAG_SAME_VALUE_LENGTH;
-      }
-      if (currentState.type == previousState.type) {
-        flag |= FLAG_SAME_TYPE;
-      }
-
-      // encode timestamp
-      diffTimestamp = previousState.timestamp - currentState.timestamp;
-      boolean minusDiffTimestamp = diffTimestamp < 0;
-      if (minusDiffTimestamp) {
-        diffTimestamp = -diffTimestamp;
-      }
-      diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
-      if (diffTimestampFitsInBytes < timestampFitsInBytes) {
-        flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
-        flag |= FLAG_TIMESTAMP_IS_DIFF;
-        if (minusDiffTimestamp) {
-          flag |= FLAG_TIMESTAMP_SIGN;
-        }
-      } else {
-        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
-        if (negativeTimestamp) {
-          flag |= FLAG_TIMESTAMP_SIGN;
-        }
-      }
-    }
-
-    out.write(flag);
-
-    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
-      ByteBufferUtils.putCompressedInt(out, keyLength);
-    }
-    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
-      ByteBufferUtils.putCompressedInt(out, valueLength);
-    }
-
-    ByteBufferUtils.putCompressedInt(out, commonPrefix);
-    ByteBufferUtils.skip(in, commonPrefix);
-
-    if (previousState.isFirst() ||
-        commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
-      int restRowLength =
-          currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
-      ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
-      ByteBufferUtils.skip(in, currentState.familyLength +
-          KeyValue.FAMILY_LENGTH_SIZE);
-      ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
-    } else {
-      ByteBufferUtils.moveBufferToStream(out, in,
-          keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
-    }
-
-    if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
-      ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
-    } else {
-      ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
-    }
-
-    if ((flag & FLAG_SAME_TYPE) == 0) {
-      out.write(currentState.type);
-    }
-    ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
-
-    ByteBufferUtils.moveBufferToStream(out, in, valueLength);
-  }
-
   private void uncompressSingleKeyValue(DataInputStream source,
       ByteBuffer buffer,
       DiffCompressionState state)
@@ -316,24 +192,110 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public void internalEncodeKeyValues(DataOutputStream out,
-      ByteBuffer in, HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
-    in.rewind();
-    ByteBufferUtils.putInt(out, in.limit());
-    DiffCompressionState previousState = new DiffCompressionState();
-    DiffCompressionState currentState = new DiffCompressionState();
-    while (in.hasRemaining()) {
-      compressSingleKeyValue(previousState, currentState,
-          out, in);
-      afterEncodingKeyValue(in, out, encodingCtx);
-
-      // swap previousState <-> currentState
-      DiffCompressionState tmp = previousState;
-      previousState = currentState;
-      currentState = tmp;
-    }
+  public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+      DataOutputStream out) throws IOException {
+    EncodingState state = encodingContext.getEncodingState();
+    int size = compressSingleKeyValue(out, kv, state.prevKv);
+    size += afterEncodingKeyValue(kv, out, encodingContext);
+    state.prevKv = kv;
+    return size;
   }
 
+  private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
+      throws IOException {
+    byte flag = 0;
+    int kLength = kv.getKeyLength();
+    int vLength = kv.getValueLength();
+
+    long timestamp;
+    long diffTimestamp = 0;
+    int diffTimestampFitsInBytes = 0;
+    int timestampFitsInBytes;
+    int commonPrefix;
+    byte[] curKvBuf = kv.getBuffer();
+
+    if (prevKv == null) {
+      timestamp = kv.getTimestamp();
+      if (timestamp < 0) {
+        flag |= FLAG_TIMESTAMP_SIGN;
+        timestamp = -timestamp;
+      }
+      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+      flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+      commonPrefix = 0;
+      // put column family
+      byte familyLength = kv.getFamilyLength();
+      out.write(familyLength);
+      out.write(kv.getFamilyArray(), kv.getFamilyOffset(), familyLength);
+    } else {
+      // Finding common prefix
+      int preKeyLength = prevKv.getKeyLength();
+      commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
+          - KeyValue.TIMESTAMP_TYPE_SIZE, prevKv.getBuffer(), prevKv.getKeyOffset(), preKeyLength
+          - KeyValue.TIMESTAMP_TYPE_SIZE);
+      if (kLength == preKeyLength) {
+        flag |= FLAG_SAME_KEY_LENGTH;
+      }
+      if (vLength == prevKv.getValueLength()) {
+        flag |= FLAG_SAME_VALUE_LENGTH;
+      }
+      if (kv.getTypeByte() == prevKv.getTypeByte()) {
+        flag |= FLAG_SAME_TYPE;
+      }
+      // don't compress timestamp and type using prefix encode timestamp
+      timestamp = kv.getTimestamp();
+      diffTimestamp = prevKv.getTimestamp() - timestamp;
+      boolean negativeTimestamp = timestamp < 0;
+      if (negativeTimestamp) {
+        timestamp = -timestamp;
+      }
+      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+      boolean minusDiffTimestamp = diffTimestamp < 0;
+      if (minusDiffTimestamp) {
+        diffTimestamp = -diffTimestamp;
+      }
+      diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
+      if (diffTimestampFitsInBytes < timestampFitsInBytes) {
+        flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+        flag |= FLAG_TIMESTAMP_IS_DIFF;
+        if (minusDiffTimestamp) {
+          flag |= FLAG_TIMESTAMP_SIGN;
+        }
+      } else {
+        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+        if (negativeTimestamp) {
+          flag |= FLAG_TIMESTAMP_SIGN;
+        }
+      }
+    }
+    out.write(flag);
+    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+      ByteBufferUtils.putCompressedInt(out, kLength);
+    }
+    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+      ByteBufferUtils.putCompressedInt(out, vLength);
+    }
+    ByteBufferUtils.putCompressedInt(out, commonPrefix);
+    if (prevKv == null || commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
+      int restRowLength = kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+      out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restRowLength);
+      out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
+    } else {
+      out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kLength - commonPrefix
+          - KeyValue.TIMESTAMP_TYPE_SIZE);
+    }
+    if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
+      ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
+    } else {
+      ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
+    }
+
+    if ((flag & FLAG_SAME_TYPE) == 0) {
+      out.write(kv.getTypeByte());
+    }
+    out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
+    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+  }
 
   @Override
   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
index 9e0497e..ce7356c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -36,13 +37,16 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.Compressor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
  * Encapsulates a data block compressed using a particular encoding algorithm.
  * Useful for testing and benchmarking.
+ * This is used only in testing.
  */
 @InterfaceAudience.Private
+@VisibleForTesting
 public class EncodedDataBlock {
   private byte[] rawKVs;
   private ByteBuffer rawBuffer;
@@ -215,16 +219,53 @@ public class EncodedDataBlock {
    * @return encoded data block with header and checksum
    */
   public byte[] encodeData() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
-      this.dataBlockEncoder.encodeKeyValues(
-          getUncompressedBuffer(), encodingCtx);
+      baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
+      DataOutputStream out = new DataOutputStream(baos);
+      this.dataBlockEncoder.startBlockEncoding(encodingCtx, out);
+      ByteBuffer in = getUncompressedBuffer();
+      in.rewind();
+      int klength, vlength;
+      short tagsLength = 0;
+      long memstoreTS = 0L;
+      KeyValue kv = null;
+      while (in.hasRemaining()) {
+        int kvOffset = in.position();
+        klength = in.getInt();
+        vlength = in.getInt();
+        ByteBufferUtils.skip(in, klength + vlength);
+        if (this.meta.isIncludesTags()) {
+          tagsLength = in.getShort();
+          ByteBufferUtils.skip(in, tagsLength);
+        }
+        if (this.meta.isIncludesMvcc()) {
+          memstoreTS = ByteBufferUtils.readVLong(in);
+        }
+        kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
+            klength, vlength, tagsLength));
+        kv.setMvccVersion(memstoreTS);
+        this.dataBlockEncoder.encode(kv, encodingCtx, out);
+      }
+      BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
+      baos.writeTo(stream);
+      this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, stream.buf);
     } catch (IOException e) {
       throw new RuntimeException(String.format(
           "Bug in encoding part of algorithm %s. " +
           "Probably it requested more bytes than are available.",
           toString()), e);
     }
-    return encodingCtx.getUncompressedBytesWithHeader();
+    return baos.toByteArray();
+  }
+
+  private static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
+    private byte[] buf;
+
+    @Override
+    public void write(byte[] b, int off, int len) {
+      this.buf = b;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
new file mode 100644
index 0000000..b16f099
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Keeps track of the encoding state.
+ */
+@InterfaceAudience.Private
+public class EncodingState {
+
+  /**
+   * The previous KeyValue the encoder encoded.
+   */
+  protected KeyValue prevKv = null;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index 0346b20..4325f96 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encoding;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -102,118 +101,14 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
 
   }
 
-  private void compressSingleKeyValue(
-        FastDiffCompressionState previousState,
-        FastDiffCompressionState currentState,
-        OutputStream out, ByteBuffer in) throws IOException {
-    currentState.prevOffset = in.position();
-    int keyLength = in.getInt();
-    int valueOffset =
-        currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
-    int valueLength = in.getInt();
-    byte flag = 0;
-
-    if (previousState.isFirst()) {
-      // copy the key, there is no common prefix with none
-      out.write(flag);
-      ByteBufferUtils.putCompressedInt(out, keyLength);
-      ByteBufferUtils.putCompressedInt(out, valueLength);
-      ByteBufferUtils.putCompressedInt(out, 0);
-
-      currentState.readKey(in, keyLength, valueLength);
-
-      ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
-    } else {
-      // find a common prefix and skip it
-      int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
-          previousState.prevOffset + KeyValue.ROW_OFFSET,
-          Math.min(keyLength, previousState.keyLength) -
-          KeyValue.TIMESTAMP_TYPE_SIZE);
-
-      currentState.readKey(in, keyLength, valueLength,
-          commonPrefix, previousState);
-
-      if (keyLength == previousState.keyLength) {
-        flag |= FLAG_SAME_KEY_LENGTH;
-      }
-      if (valueLength == previousState.valueLength) {
-        flag |= FLAG_SAME_VALUE_LENGTH;
-      }
-      if (currentState.type == previousState.type) {
-        flag |= FLAG_SAME_TYPE;
-      }
-
-      int commonTimestampPrefix = findCommonTimestampPrefix(
-          currentState, previousState);
-      flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
-
-      // Check if current and previous values are the same. Compare value
-      // length first as an optimization.
-      if (valueLength == previousState.valueLength) {
-        int previousValueOffset = previousState.prevOffset
-            + previousState.keyLength + KeyValue.ROW_OFFSET;
-        if (ByteBufferUtils.arePartsEqual(in,
-                previousValueOffset, previousState.valueLength,
-                valueOffset, valueLength)) {
-          flag |= FLAG_SAME_VALUE;
-        }
-      }
-
-      out.write(flag);
-      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
-        ByteBufferUtils.putCompressedInt(out, keyLength);
-      }
-      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
-        ByteBufferUtils.putCompressedInt(out, valueLength);
-      }
-      ByteBufferUtils.putCompressedInt(out, commonPrefix);
-
-      ByteBufferUtils.skip(in, commonPrefix);
-      if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
-        // Previous and current rows are different. Copy the differing part of
-        // the row, skip the column family, and copy the qualifier.
-        ByteBufferUtils.moveBufferToStream(out, in,
-            currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
-        ByteBufferUtils.skip(in, currentState.familyLength +
-            KeyValue.FAMILY_LENGTH_SIZE);
-        ByteBufferUtils.moveBufferToStream(out, in,
-            currentState.qualifierLength);
-      } else {
-        // The common part includes the whole row. As the column family is the
-        // same across the whole file, it will automatically be included in the
-        // common prefix, so we need not special-case it here.
-        int restKeyLength = keyLength - commonPrefix -
-            KeyValue.TIMESTAMP_TYPE_SIZE;
-        ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
-      }
-      ByteBufferUtils.skip(in, commonTimestampPrefix);
-      ByteBufferUtils.moveBufferToStream(out, in,
-          KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
-
-      // Write the type if it is not the same as before.
-      if ((flag & FLAG_SAME_TYPE) == 0) {
-        out.write(currentState.type);
-      }
-
-      // Write the value if it is not the same as before.
-      if ((flag & FLAG_SAME_VALUE) == 0) {
-        ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
-      }
-
-      // Skip key type and value in the input buffer.
-      ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
+  private int findCommonTimestampPrefix(byte[] curKvBuf, int curKvTsOff, byte[] preKvBuf,
+      int preKvTsOff) {
+    int commonPrefix = 0;
+    while (commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
+        && curKvBuf[curKvTsOff + commonPrefix] == preKvBuf[preKvTsOff + commonPrefix]) {
+      commonPrefix++;
     }
-  }
-
-  private int findCommonTimestampPrefix(FastDiffCompressionState left,
-      FastDiffCompressionState right) {
-    int prefixTimestamp = 0;
-    while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
-        left.timestamp[prefixTimestamp]
-            == right.timestamp[prefixTimestamp]) {
-      prefixTimestamp++;
-    }
-    return prefixTimestamp; // has to be at most 7 bytes
+    return commonPrefix; // has to be at most 7 bytes
   }
 
   private void uncompressSingleKeyValue(DataInputStream source,
@@ -342,22 +237,98 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public void internalEncodeKeyValues(DataOutputStream out, ByteBuffer in,
-      HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
-    in.rewind();
-    ByteBufferUtils.putInt(out, in.limit());
-    FastDiffCompressionState previousState = new FastDiffCompressionState();
-    FastDiffCompressionState currentState = new FastDiffCompressionState();
-    while (in.hasRemaining()) {
-      compressSingleKeyValue(previousState, currentState,
-          out, in);
-      afterEncodingKeyValue(in, out, encodingCtx);
-
-      // swap previousState <-> currentState
-      FastDiffCompressionState tmp = previousState;
-      previousState = currentState;
-      currentState = tmp;
+  public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+      DataOutputStream out) throws IOException {
+    EncodingState state = encodingContext.getEncodingState();
+    int size = compressSingleKeyValue(out, kv, state.prevKv);
+    size += afterEncodingKeyValue(kv, out, encodingContext);
+    state.prevKv = kv;
+    return size;
+  }
+
+  private int compressSingleKeyValue(DataOutputStream out, KeyValue kv, KeyValue prevKv)
+      throws IOException {
+    byte flag = 0;
+    int kLength = kv.getKeyLength();
+    int vLength = kv.getValueLength();
+    byte[] curKvBuf = kv.getBuffer();
+
+    if (prevKv == null) {
+      // copy the key, there is no common prefix with none
+      out.write(flag);
+      ByteBufferUtils.putCompressedInt(out, kLength);
+      ByteBufferUtils.putCompressedInt(out, vLength);
+      ByteBufferUtils.putCompressedInt(out, 0);
+      out.write(curKvBuf, kv.getKeyOffset(), kLength + vLength);
+    } else {
+      byte[] preKvBuf = prevKv.getBuffer();
+      int preKeyLength = prevKv.getKeyLength();
+      int preValLength = prevKv.getValueLength();
+      // find a common prefix and skip it
+      int commonPrefix = ByteBufferUtils.findCommonPrefix(curKvBuf, kv.getKeyOffset(), kLength
+          - KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset(), preKeyLength
+          - KeyValue.TIMESTAMP_TYPE_SIZE);
+
+      if (kLength == prevKv.getKeyLength()) {
+        flag |= FLAG_SAME_KEY_LENGTH;
+      }
+      if (vLength == prevKv.getValueLength()) {
+        flag |= FLAG_SAME_VALUE_LENGTH;
+      }
+      if (kv.getTypeByte() == prevKv.getTypeByte()) {
+        flag |= FLAG_SAME_TYPE;
+      }
+
+      int commonTimestampPrefix = findCommonTimestampPrefix(curKvBuf, kv.getKeyOffset() + kLength
+          - KeyValue.TIMESTAMP_TYPE_SIZE, preKvBuf, prevKv.getKeyOffset() + preKeyLength
+          - KeyValue.TIMESTAMP_TYPE_SIZE);
+
+      flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
+
+      // Check if current and previous values are the same. Compare value
+      // length first as an optimization.
+      if (vLength == preValLength
+          && Bytes.equals(kv.getValueArray(), kv.getValueOffset(), vLength,
+              prevKv.getValueArray(), prevKv.getValueOffset(), preValLength)) {
+        flag |= FLAG_SAME_VALUE;
+      }
+
+      out.write(flag);
+      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(out, kLength);
+      }
+      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(out, vLength);
+      }
+      ByteBufferUtils.putCompressedInt(out, commonPrefix);
+
+      if (commonPrefix < kv.getRowLength() + KeyValue.ROW_LENGTH_SIZE) {
+        // Previous and current rows are different. Copy the differing part of
+        // the row, skip the column family, and copy the qualifier.
+        out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, kv.getRowLength()
+            + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+        out.write(curKvBuf, kv.getQualifierOffset(), kv.getQualifierLength());
+      } else {
+        // The common part includes the whole row. As the column family is the
+        // same across the whole file, it will automatically be included in the
+        // common prefix, so we need not special-case it here.
+        int restKeyLength = kLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
+        out.write(curKvBuf, kv.getKeyOffset() + commonPrefix, restKeyLength);
+      }
+      out.write(curKvBuf, kv.getKeyOffset() + kLength - KeyValue.TIMESTAMP_TYPE_SIZE
+          + commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
+
+      // Write the type if it is not the same as before.
+      if ((flag & FLAG_SAME_TYPE) == 0) {
+        out.write(kv.getTypeByte());
+      }
+
+      // Write the value if it is not the same as before.
+      if ((flag & FLAG_SAME_VALUE) == 0) {
+        out.write(kv.getValueArray(), kv.getValueOffset(), vLength);
+      }
     }
+    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
index 8386377..0dc1c8a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultEncodingContext.java
@@ -49,13 +49,9 @@ import com.google.common.base.Preconditions;
 public class HFileBlockDefaultEncodingContext implements
     HFileBlockEncodingContext {
   private byte[] onDiskBytesWithHeader;
-  private byte[] uncompressedBytesWithHeader;
   private BlockType blockType;
   private final DataBlockEncoding encodingAlgo;
 
-  private ByteArrayOutputStream encodedStream = new ByteArrayOutputStream();
-  private DataOutputStream dataOut = new DataOutputStream(encodedStream);
-
   private byte[] dummyHeader;
 
   // Compression state
@@ -77,6 +73,8 @@ public class HFileBlockDefaultEncodingContext implements
   /** Initialization vector */
   private byte[] iv;
 
+  private EncodingState encoderState;
+
   /**
    * @param encoding encoding used
    * @param headerBytes dummy header bytes
@@ -113,52 +111,35 @@ public class HFileBlockDefaultEncodingContext implements
       "Please pass HConstants.HFILEBLOCK_DUMMY_HEADER instead of null for param headerBytes");
   }
 
-  @Override
-  public void setDummyHeader(byte[] headerBytes) {
-    dummyHeader = headerBytes;
-  }
-
   /**
    * prepare to start a new encoding.
    * @throws IOException
    */
-  public void prepareEncoding() throws IOException {
-    encodedStream.reset();
-    dataOut.write(dummyHeader);
-    if (encodingAlgo != null
-        && encodingAlgo != DataBlockEncoding.NONE) {
-      encodingAlgo.writeIdInBytes(dataOut);
+  public void prepareEncoding(DataOutputStream out) throws IOException {
+    if (encodingAlgo != null && encodingAlgo != DataBlockEncoding.NONE) {
+      encodingAlgo.writeIdInBytes(out);
     }
   }
 
   @Override
   public void postEncoding(BlockType blockType)
       throws IOException {
-    dataOut.flush();
-    compressAfterEncodingWithBlockType(encodedStream.toByteArray(), blockType);
     this.blockType = blockType;
   }
 
-  /**
-   * @param uncompressedBytesWithHeader
-   * @param blockType
-   * @throws IOException
-   */
-  public void compressAfterEncodingWithBlockType(byte[] uncompressedBytesWithHeader,
-      BlockType blockType) throws IOException {
-    compressAfterEncoding(uncompressedBytesWithHeader, blockType, dummyHeader);
+  @Override
+  public byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException {
+    compressAfterEncoding(uncompressedBytesWithHeader, dummyHeader);
+    return onDiskBytesWithHeader;
   }
 
   /**
    * @param uncompressedBytesWithHeader
-   * @param blockType
    * @param headerBytes
    * @throws IOException
    */
-  protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader,
-      BlockType blockType, byte[] headerBytes) throws IOException {
-    this.uncompressedBytesWithHeader = uncompressedBytesWithHeader;
-
+  protected void compressAfterEncoding(byte[] uncompressedBytesWithHeader, byte[] headerBytes)
+      throws IOException {
     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
     if (cryptoContext != Encryption.Context.NONE) {
 
@@ -238,20 +219,7 @@ public class HFileBlockDefaultEncodingContext implements
       } else {
         onDiskBytesWithHeader = uncompressedBytesWithHeader;
       }
-
     }
-
-    this.blockType = blockType;
-  }
-
-  @Override
-  public byte[] getOnDiskBytesWithHeader() {
-    return onDiskBytesWithHeader;
-  }
-
-  @Override
-  public byte[] getUncompressedBytesWithHeader() {
-    return uncompressedBytesWithHeader;
   }
 
   @Override
@@ -271,10 +239,6 @@ public class HFileBlockDefaultEncodingContext implements
     }
   }
 
-  public DataOutputStream getOutputStreamForEncoder() {
-    return this.dataOut;
-  }
-
   @Override
   public DataBlockEncoding getDataBlockEncoding() {
     return this.encodingAlgo;
@@ -292,4 +256,14 @@ public class HFileBlockDefaultEncodingContext implements
   public void setTagCompressionContext(TagCompressionContext tagCompressionContext) {
     this.tagCompressionContext = tagCompressionContext;
   }
+
+  @Override
+  public EncodingState getEncodingState() {
+    return this.encoderState;
+  }
+
+  @Override
+  public void setEncodingState(EncodingState state) {
+    this.encoderState = state;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
index 2fa9112..7649021 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockEncodingContext.java
@@ -17,7 +17,6 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.IOException;
-import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
@@ -34,33 +33,11 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 public interface HFileBlockEncodingContext {
 
   /**
-   * @return OutputStream to which encoded data is written
-   */
-  OutputStream getOutputStreamForEncoder();
-
-  /**
-   * @return encoded and compressed bytes with header which are ready to write
-   *         out to disk
-   */
-  byte[] getOnDiskBytesWithHeader();
-
-  /**
-   * @return encoded but not heavily compressed bytes with header which can be
-   *         cached in block cache
-   */
-  byte[] getUncompressedBytesWithHeader();
-
-  /**
    * @return the block type after encoding
    */
   BlockType getBlockType();
 
   /**
-   * sets the dummy header bytes
-   */
-  void setDummyHeader(byte[] headerBytes);
-
-  /**
    * @return the {@link DataBlockEncoding} encoding used
    */
   DataBlockEncoding getDataBlockEncoding();
@@ -83,4 +60,22 @@ public interface HFileBlockEncodingContext {
    * @return HFile context information
    */
   HFileContext getHFileContext();
+
+  /**
+   * Sets the encoding state.
+   * @param state
+   */
+  void setEncodingState(EncodingState state);
+
+  /**
+   * @return the encoding state
+   */
+  EncodingState getEncodingState();
+
+  /**
+   * @param uncompressedBytesWithHeader encoded bytes with header
+   * @return Bytes with header which are ready to write out to disk. This is compressed and
+   *         encrypted bytes applying the set compression algorithm and encryption.
+   */
+  byte[] compressAndEncrypt(byte[] uncompressedBytesWithHeader) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index f57ff4f..c699f6f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -44,50 +44,33 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
 
-  private int addKV(int prevKeyOffset, DataOutputStream out,
-      ByteBuffer in, int prevKeyLength) throws IOException {
-    int keyLength = in.getInt();
-    int valueLength = in.getInt();
-
-    if (prevKeyOffset == -1) {
+  @Override
+  public int internalEncode(KeyValue kv, HFileBlockDefaultEncodingContext encodingContext,
+      DataOutputStream out) throws IOException {
+    byte[] kvBuf = kv.getBuffer();
+    int klength = kv.getKeyLength();
+    int vlength = kv.getValueLength();
+    EncodingState state = encodingContext.getEncodingState();
+    if (state.prevKv == null) {
       // copy the key, there is no common prefix with none
-      ByteBufferUtils.putCompressedInt(out, keyLength);
-      ByteBufferUtils.putCompressedInt(out, valueLength);
+      ByteBufferUtils.putCompressedInt(out, klength);
+      ByteBufferUtils.putCompressedInt(out, vlength);
       ByteBufferUtils.putCompressedInt(out, 0);
-      ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
+      out.write(kvBuf, kv.getKeyOffset(), klength + vlength);
     } else {
       // find a common prefix and skip it
-      int common = ByteBufferUtils.findCommonPrefix(
-          in, prevKeyOffset + KeyValue.ROW_OFFSET,
-          in.position(),
-          Math.min(prevKeyLength, keyLength));
-
-      ByteBufferUtils.putCompressedInt(out, keyLength - common);
-      ByteBufferUtils.putCompressedInt(out, valueLength);
+      int common = ByteBufferUtils.findCommonPrefix(state.prevKv.getBuffer(),
+          state.prevKv.getKeyOffset(), state.prevKv.getKeyLength(), kvBuf, kv.getKeyOffset(),
+          kv.getKeyLength());
+      ByteBufferUtils.putCompressedInt(out, klength - common);
+      ByteBufferUtils.putCompressedInt(out, vlength);
       ByteBufferUtils.putCompressedInt(out, common);
-
-      ByteBufferUtils.skip(in, common);
-      ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
-          + valueLength);
-    }
-
-    return keyLength;
-  }
-
-  @Override
-  public void internalEncodeKeyValues(DataOutputStream writeHere, ByteBuffer in,
-      HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
-    in.rewind();
-    ByteBufferUtils.putInt(writeHere, in.limit());
-    int prevOffset = -1;
-    int offset = 0;
-    int keyLength = 0;
-    while (in.hasRemaining()) {
-      offset = in.position();
-      keyLength = addKV(prevOffset, writeHere, in, keyLength);
-      afterEncodingKeyValue(in, writeHere, encodingCtx);
-      prevOffset = offset;
+      out.write(kvBuf, kv.getKeyOffset() + common, klength - common + vlength);
     }
+    int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+    size += afterEncodingKeyValue(kv, out, encodingContext);
+    state.prevKv = kv;
+    return size;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
index ef576eb..2a0c459 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.codec.prefixtree.encode.PrefixTreeEncoder;
 import org.apache.hadoop.hbase.codec.prefixtree.scanner.CellSearcher;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.encoding.EncodingState;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * This class is created via reflection in DataBlockEncoding enum. Update the enum if class name or
@@ -63,50 +65,6 @@ public class PrefixTreeCodec implements DataBlockEncoder{
   public PrefixTreeCodec() {
   }
 
-  /**
-   * Copied from BufferedDataBlockEncoder. Almost definitely can be improved, but i'm not familiar
-   * enough with the concept of the HFileBlockEncodingContext.
-   */
-  @Override
-  public void encodeKeyValues(ByteBuffer in,
-      HFileBlockEncodingContext blkEncodingCtx) throws IOException {
-    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
-      throw new IOException(this.getClass().getName() + " only accepts "
-          + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
-    }
-
-    HFileBlockDefaultEncodingContext encodingCtx
-        = (HFileBlockDefaultEncodingContext) blkEncodingCtx;
-    encodingCtx.prepareEncoding();
-    DataOutputStream dataOut = encodingCtx.getOutputStreamForEncoder();
-    internalEncodeKeyValues(dataOut, in, encodingCtx.getHFileContext().isIncludesMvcc(),
-        encodingCtx.getHFileContext().isIncludesTags());
-
-    //do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE?
-    if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
-      encodingCtx.postEncoding(BlockType.ENCODED_DATA);
-    } else {
-      encodingCtx.postEncoding(BlockType.DATA);
-    }
-  }
-
-  private void internalEncodeKeyValues(DataOutputStream encodedOutputStream,
-      ByteBuffer rawKeyValues, boolean includesMvccVersion, boolean includesTag) throws IOException {
-    rawKeyValues.rewind();
-    PrefixTreeEncoder builder = EncoderFactory.checkOut(encodedOutputStream, includesMvccVersion);
-
-    try {
-      KeyValue kv;
-      while ((kv = KeyValueUtil.nextShallowCopy(rawKeyValues, includesMvccVersion, includesTag)) != null) {
-        builder.write(kv);
-      }
-      builder.flush();
-    } finally {
-      EncoderFactory.checkIn(builder);
-    }
-  }
-
-
   @Override
   public ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext decodingCtx)
       throws IOException {
@@ -202,4 +160,54 @@ public class PrefixTreeCodec implements DataBlockEncoder{
     return new PrefixTreeSeeker(decodingCtx.getHFileContext().isIncludesMvcc());
   }
 
+  @Override
+  public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException {
+    PrefixTreeEncodingState state = (PrefixTreeEncodingState) encodingCtx.getEncodingState();
+    PrefixTreeEncoder builder = state.builder;
+    builder.write(kv);
+    int size = kv.getLength();
+    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
+      size += WritableUtils.getVIntSize(kv.getMvccVersion());
+    }
+    return size;
+  }
+
+  private static class PrefixTreeEncodingState extends EncodingState {
+    PrefixTreeEncoder builder = null;
+  }
+
+  @Override
+  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
+      throws IOException {
+    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
+      throw new IOException(this.getClass().getName() + " only accepts "
+          + HFileBlockDefaultEncodingContext.class.getName() + " as the " + "encoding context.");
+    }
+
+    HFileBlockDefaultEncodingContext encodingCtx = 
+        (HFileBlockDefaultEncodingContext) blkEncodingCtx;
+    encodingCtx.prepareEncoding(out);
+
+    PrefixTreeEncoder builder = EncoderFactory.checkOut(out, encodingCtx.getHFileContext()
+        .isIncludesMvcc());
+    PrefixTreeEncodingState state = new PrefixTreeEncodingState();
+    state.builder = builder;
+    blkEncodingCtx.setEncodingState(state);
+  }
+
+  @Override
+  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+      byte[] uncompressedBytesWithHeader) throws IOException {
+    PrefixTreeEncodingState state = (PrefixTreeEncodingState) encodingCtx.getEncodingState();
+    PrefixTreeEncoder builder = state.builder;
+    builder.flush();
+    EncoderFactory.checkIn(builder);
+    // do i need to check this, or will it always be DataBlockEncoding.PREFIX_TREE?
+    if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
+      encodingCtx.postEncoding(BlockType.ENCODED_DATA);
+    } else {
+      encodingCtx.postEncoding(BlockType.DATA);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
index ba5340d..e26f151 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderFactory.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public class EncoderFactory {
 
-  private static final EncoderPool POOL = new ThreadLocalEncoderPool();
+  private static final EncoderPool POOL = new EncoderPoolImpl();
 
 
   public static PrefixTreeEncoder checkOut(OutputStream outputStream, boolean includeMvccVersion) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java
new file mode 100644
index 0000000..510404d
--- /dev/null
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/encode/EncoderPoolImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec.prefixtree.encode;
+
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class EncoderPoolImpl implements EncoderPool {
+
+  private BlockingQueue<PrefixTreeEncoder> unusedEncoders = 
+      new LinkedBlockingQueue<PrefixTreeEncoder>();
+
+  @Override
+  public PrefixTreeEncoder checkOut(OutputStream outputStream, boolean includeMvccVersion) {
+    PrefixTreeEncoder encoder = unusedEncoders.poll();
+    if (encoder == null) {
+      encoder = new PrefixTreeEncoder(outputStream, includeMvccVersion);
+    } else {
+      encoder.reset(outputStream, includeMvccVersion);
+    }
+    return encoder;
+  }
+
+  @Override
+  public void checkIn(PrefixTreeEncoder encoder) {
+    this.unusedEncoders.add(encoder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 89702c5..b39eec2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -641,6 +642,10 @@ public class HFileBlock implements Cacheable {
      */
     private DataOutputStream userDataStream;
 
+    // Size of actual data being written. Not considering the block encoding/compression. This
+    // includes the header size also.
+    private int unencodedDataSizeWritten;
+
     /**
      * Bytes to be written to the file system, including the header. Compressed
      * if compression is turned on. It also includes the checksum data that 
@@ -731,10 +736,25 @@ public class HFileBlock implements Cacheable {
 
       // We will compress it later in finishBlock()
       userDataStream = new DataOutputStream(baosInMemory);
+      if (newBlockType == BlockType.DATA) {
+        this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
+      }
+      this.unencodedDataSizeWritten = 0;
       return userDataStream;
     }
 
     /**
+     * Writes the kv to this block
+     * @param kv
+     * @throws IOException
+     */
+    public void write(KeyValue kv) throws IOException{
+      expectState(State.WRITING);
+      this.unencodedDataSizeWritten += this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx,
+          this.userDataStream);
+    }
+
+    /**
      * Returns the stream for the user to write to. The block writer takes care
      * of handling compression and buffering for caching on write. Can only be
      * called in the "writing" state.
@@ -750,7 +770,7 @@ public class HFileBlock implements Cacheable {
      * Transitions the block writer from the "writing" state to the "block
      * ready" state.  Does nothing if a block is already finished.
      */
-    private void ensureBlockReady() throws IOException {
+    void ensureBlockReady() throws IOException {
       Preconditions.checkState(state != State.INIT,
           "Unexpected state: " + state);
 
@@ -768,6 +788,14 @@ public class HFileBlock implements Cacheable {
      * write state to "block ready".
      */
     private void finishBlock() throws IOException {
+      if (blockType == BlockType.DATA) {
+        BufferGrabbingByteArrayOutputStream baosInMemoryCopy = 
+            new BufferGrabbingByteArrayOutputStream();
+        baosInMemory.writeTo(baosInMemoryCopy);
+        this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
+            baosInMemoryCopy.buf, blockType);
+        blockType = dataBlockEncodingCtx.getBlockType();
+      }
       userDataStream.flush();
       // This does an array copy, so it is safe to cache this byte array.
       uncompressedBytesWithHeader = baosInMemory.toByteArray();
@@ -777,15 +805,13 @@ public class HFileBlock implements Cacheable {
       // cache-on-write. In a way, the block is ready, but not yet encoded or
       // compressed.
       state = State.BLOCK_READY;
-      if (blockType == BlockType.DATA) {
-        encodeDataBlockForDisk();
+      if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
+        onDiskBytesWithHeader = dataBlockEncodingCtx
+            .compressAndEncrypt(uncompressedBytesWithHeader);
       } else {
-        defaultBlockEncodingCtx.compressAfterEncodingWithBlockType(
-            uncompressedBytesWithHeader, blockType);
-        onDiskBytesWithHeader =
-          defaultBlockEncodingCtx.getOnDiskBytesWithHeader();
+        onDiskBytesWithHeader = defaultBlockEncodingCtx
+            .compressAndEncrypt(uncompressedBytesWithHeader);
       }
-
       int numBytes = (int) ChecksumUtil.numBytes(
           onDiskBytesWithHeader.length,
           fileContext.getBytesPerChecksum());
@@ -805,24 +831,17 @@ public class HFileBlock implements Cacheable {
           onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
     }
 
-    /**
-     * Encodes this block if it is a data block and encoding is turned on in
-     * {@link #dataBlockEncoder}.
-     */
-    private void encodeDataBlockForDisk() throws IOException {
-      // do data block encoding, if data block encoder is set
-      ByteBuffer rawKeyValues =
-          ByteBuffer.wrap(uncompressedBytesWithHeader, HConstants.HFILEBLOCK_HEADER_SIZE,
-              uncompressedBytesWithHeader.length - HConstants.HFILEBLOCK_HEADER_SIZE).slice();
-
-      // do the encoding
-      dataBlockEncoder.beforeWriteToDisk(rawKeyValues, dataBlockEncodingCtx, blockType);
-
-      uncompressedBytesWithHeader =
-          dataBlockEncodingCtx.getUncompressedBytesWithHeader();
-      onDiskBytesWithHeader =
-          dataBlockEncodingCtx.getOnDiskBytesWithHeader();
-      blockType = dataBlockEncodingCtx.getBlockType();
+    public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
+      private byte[] buf;
+
+      @Override
+      public void write(byte[] b, int off, int len) {
+        this.buf = b;
+      }
+
+      public byte[] getBuffer() {
+        return this.buf;
+      }
     }
 
     /**
@@ -873,7 +892,7 @@ public class HFileBlock implements Cacheable {
      * @param out the output stream to write the
      * @throws IOException
      */
-    private void finishBlockAndWriteHeaderAndData(DataOutputStream out)
+    protected void finishBlockAndWriteHeaderAndData(DataOutputStream out)
       throws IOException {
       ensureBlockReady();
       out.write(onDiskBytesWithHeader);
@@ -972,9 +991,8 @@ public class HFileBlock implements Cacheable {
      * @return the number of bytes written
      */
     public int blockSizeWritten() {
-      if (state != State.WRITING)
-        return 0;
-      return userDataStream.size();
+      if (state != State.WRITING) return 0;
+      return this.unencodedDataSizeWritten;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
index 6493f07..7049e4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
@@ -16,10 +16,11 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
@@ -36,18 +37,38 @@ public interface HFileDataBlockEncoder {
   byte[] DATA_BLOCK_ENCODING = Bytes.toBytes("DATA_BLOCK_ENCODING");
 
   /**
-   * Should be called before an encoded or unencoded data block is written to
-   * disk.
-   * @param in KeyValues next to each other
-   * @param encodingResult the encoded result
-   * @param blockType block type
+   * Starts encoding for a block of KeyValues. Call
+   * {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[], BlockType)}
+   * to finish encoding of a block.
+   * @param encodingCtx
+   * @param out
    * @throws IOException
    */
-  void beforeWriteToDisk(
-    ByteBuffer in,
-    HFileBlockEncodingContext encodingResult,
-    BlockType blockType
-  ) throws IOException;
+  void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException;
+
+  /**
+   * Encodes a KeyValue.
+   * @param kv
+   * @param encodingCtx
+   * @param out
+   * @return unencoded kv size
+   * @throws IOException
+   */
+  int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException;
+
+  /**
+   * Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
+   * stuff for the encoded block. It must be called at the end of block encoding.
+   * @param encodingCtx
+   * @param out
+   * @param uncompressedBytesWithHeader
+   * @param blockType
+   * @throws IOException
+   */
+  void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+      byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException;
 
   /**
    * Decides whether we should use a scanner over encoded blocks.

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
index 4094450..edf4cc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
@@ -16,10 +16,11 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
@@ -89,24 +90,11 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
     }
     return encoding;
   }
-  /**
-   * Precondition: a non-encoded buffer. Postcondition: on-disk encoding.
-   *
-   * The encoded results can be stored in {@link HFileBlockEncodingContext}.
-   *
-   * @throws IOException
-   */
+
   @Override
-  public void beforeWriteToDisk(ByteBuffer in,
-      HFileBlockEncodingContext encodeCtx,
-      BlockType blockType) throws IOException {
-    if (encoding == DataBlockEncoding.NONE) {
-      // there is no need to encode the block before writing it to disk
-      ((HFileBlockDefaultEncodingContext) encodeCtx).compressAfterEncodingWithBlockType(
-          in.array(), blockType);
-      return;
-    }
-    encodeBufferToHFileBlockBuffer(in, encoding, encodeCtx);
+  public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException {
+    return this.encoding.getEncoder().encode(kv, encodingCtx, out);
   }
 
   @Override
@@ -114,26 +102,6 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
     return encoding != DataBlockEncoding.NONE;
   }
 
-  /**
-   * Encode a block of key value pairs.
-   *
-   * @param in input data to encode
-   * @param algo encoding algorithm
-   * @param encodeCtx where will the output data be stored
-   */
-  private void encodeBufferToHFileBlockBuffer(ByteBuffer in, DataBlockEncoding algo,
-      HFileBlockEncodingContext encodeCtx) {
-    DataBlockEncoder encoder = algo.getEncoder();
-    try {
-      encoder.encodeKeyValues(in, encodeCtx);
-    } catch (IOException e) {
-      throw new RuntimeException(String.format(
-          "Bug in data block encoder "
-              + "'%s', it probably requested too much data, " +
-              "exception message: %s.",
-              algo.toString(), e.getMessage()), e);
-    }
-  }
 
   @Override
   public String toString() {
@@ -158,4 +126,18 @@ public class HFileDataBlockEncoderImpl implements HFileDataBlockEncoder {
     }
     return new HFileBlockDefaultDecodingContext(fileContext);
   }
+
+  @Override
+  public void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException {
+    if (this.encoding != null && this.encoding != DataBlockEncoding.NONE) {
+      this.encoding.getEncoder().startBlockEncoding(encodingCtx, out);
+    }
+  }
+
+  @Override
+  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+      byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException {
+    this.encoding.getEncoder().endBlockEncoding(encodingCtx, out, uncompressedBytesWithHeader);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
index 2c14306..8c1c60e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Writes HFile format version 2.
@@ -251,41 +250,12 @@ public class HFileWriterV2 extends AbstractHFileWriter {
    */
   @Override
   public void append(final KeyValue kv) throws IOException {
-    append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
-        kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
-  }
-
-  /**
-   * Add key/value to file. Keys must be added in an order that agrees with the
-   * Comparator passed on construction.
-   *
-   * @param key
-   *          Key to add. Cannot be empty nor null.
-   * @param value
-   *          Value to add. Cannot be empty nor null.
-   * @throws IOException
-   */
-  @Override
-  public void append(final byte[] key, final byte[] value) throws IOException {
-    append(0, key, 0, key.length, value, 0, value.length);
-  }
-
-  /**
-   * Add key/value to file. Keys must be added in an order that agrees with the
-   * Comparator passed on construction.
-   *
-   * @param key
-   * @param koffset
-   * @param klength
-   * @param value
-   * @param voffset
-   * @param vlength
-   * @throws IOException
-   */
-  protected void append(final long memstoreTS, final byte[] key, final int koffset,
-      final int klength, final byte[] value, final int voffset, final int vlength)
-      throws IOException {
+    byte[] key = kv.getBuffer();
+    int koffset = kv.getKeyOffset();
+    int klength = kv.getKeyLength();
+    byte[] value = kv.getValueArray();
+    int voffset = kv.getValueOffset();
+    int vlength = kv.getValueLength();
     boolean dupKey = checkKey(key, koffset, klength);
     checkValue(value, voffset, vlength);
     if (!dupKey) {
@@ -295,20 +265,10 @@ public class HFileWriterV2 extends AbstractHFileWriter {
     if (!fsBlockWriter.isWriting())
       newBlock();
 
-    // Write length of key and value and then actual key and value bytes.
-    // Additionally, we may also write down the memstoreTS.
-    {
-      DataOutputStream out = fsBlockWriter.getUserDataStream();
-      out.writeInt(klength);
-      totalKeyLength += klength;
-      out.writeInt(vlength);
-      totalValueLength += vlength;
-      out.write(key, koffset, klength);
-      out.write(value, voffset, vlength);
-      if (this.hFileContext.isIncludesMvcc()) {
-        WritableUtils.writeVLong(out, memstoreTS);
-      }
-    }
+    fsBlockWriter.write(kv);
+
+    totalKeyLength += klength;
+    totalValueLength += vlength;
 
     // Are we the first key in this block?
     if (firstKeyInBlock == null) {
@@ -321,6 +281,29 @@ public class HFileWriterV2 extends AbstractHFileWriter {
     lastKeyOffset = koffset;
     lastKeyLength = klength;
     entryCount++;
+    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
+  }
+
+  /**
+   * Add key/value to file. Keys must be added in an order that agrees with the
+   * Comparator passed on construction.
+   *
+   * @param key
+   *          Key to add. Cannot be empty nor null.
+   * @param value
+   *          Value to add. Cannot be empty nor null.
+   * @throws IOException
+   */
+  @Override
+  public void append(final byte[] key, final byte[] value) throws IOException {
+    int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, 0);
+    byte[] b = new byte[kvlen];
+    int pos = 0;
+    pos = Bytes.putInt(b, pos, key.length);
+    pos = Bytes.putInt(b, pos, value.length);
+    pos = Bytes.putBytes(b, pos, key, 0, key.length);
+    Bytes.putBytes(b, pos, value, 0, value.length);
+    append(new KeyValue(b, 0, kvlen));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
index d1edea3..8c8cb2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV3.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * {@link HFile} writer for version 3.
@@ -86,10 +84,11 @@ public class HFileWriterV3 extends HFileWriterV2 {
   @Override
   public void append(final KeyValue kv) throws IOException {
     // Currently get the complete arrays
-    append(kv.getMvccVersion(), kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
-        kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(), kv.getTagsArray(),
-        kv.getTagsOffset(), kv.getTagsLength());
-    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, kv.getMvccVersion());
+    super.append(kv);
+    short tagsLength = kv.getTagsLength();
+    if (tagsLength > this.maxTagsLength) {
+      this.maxTagsLength = tagsLength;
+    }
   }
   
   /**
@@ -119,73 +118,20 @@ public class HFileWriterV3 extends HFileWriterV2 {
    */
   @Override
   public void append(final byte[] key, final byte[] value, byte[] tag) throws IOException {
-    append(0, key, 0, key.length, value, 0, value.length, tag, 0, tag.length);
-  }
-
-  /**
-   * Add key/value to file. Keys must be added in an order that agrees with the
-   * Comparator passed on construction.
-   * @param key
-   * @param koffset
-   * @param klength
-   * @param value
-   * @param voffset
-   * @param vlength
-   * @param tag
-   * @param tagsOffset
-   * @param tagLength
-   * @throws IOException
-   */
-  private void append(final long memstoreTS, final byte[] key, final int koffset,
-      final int klength, final byte[] value, final int voffset, final int vlength,
-      final byte[] tag, final int tagsOffset, final int tagsLength) throws IOException {
-    boolean dupKey = checkKey(key, koffset, klength);
-    checkValue(value, voffset, vlength);
-    if (!dupKey) {
-      checkBlockBoundary();
+    int kvlen = (int) KeyValue.getKeyValueDataStructureSize(key.length, value.length, tag.length);
+    byte[] b = new byte[kvlen];
+    int pos = 0;
+    pos = Bytes.putInt(b, pos, key.length);
+    pos = Bytes.putInt(b, pos, value.length);
+    pos = Bytes.putBytes(b, pos, key, 0, key.length);
+    pos = Bytes.putBytes(b, pos, value, 0, value.length);
+    if (tag.length > 0) {
+      pos = Bytes.putShort(b, pos, (short) tag.length);
+      Bytes.putBytes(b, pos, tag, 0, tag.length);
     }
-
-    if (!fsBlockWriter.isWriting())
-      newBlock();
-
-    // Write length of key and value and then actual key and value bytes.
-    // Additionally, we may also write down the memstoreTS.
-    {
-      DataOutputStream out = fsBlockWriter.getUserDataStream();
-      out.writeInt(klength);
-      totalKeyLength += klength;
-      out.writeInt(vlength);
-      totalValueLength += vlength;
-      out.write(key, koffset, klength);
-      out.write(value, voffset, vlength);
-      // Write the additional tag into the stream
-      if (hFileContext.isIncludesTags()) {
-        out.writeShort((short) tagsLength);
-        if (tagsLength > 0) {
-          out.write(tag, tagsOffset, tagsLength);
-          if (tagsLength > maxTagsLength) {
-            maxTagsLength = tagsLength;
-          }
-        }
-      }
-      if (this.hFileContext.isIncludesMvcc()) {
-        WritableUtils.writeVLong(out, memstoreTS);
-      }
-    }
-
-    // Are we the first key in this block?
-    if (firstKeyInBlock == null) {
-      // Copy the key.
-      firstKeyInBlock = new byte[klength];
-      System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
-    }
-
-    lastKeyBuffer = key;
-    lastKeyOffset = koffset;
-    lastKeyLength = klength;
-    entryCount++;
+    append(new KeyValue(b, 0, kvlen));
   }
-  
+
   protected void finishFileInfo() throws IOException {
     super.finishFileInfo();
     if (hFileContext.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53513dcb/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
index 3d34943..f5b61f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
@@ -16,15 +16,17 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Does not perform any kind of encoding/decoding.
@@ -40,18 +42,30 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
   }
 
   @Override
-  public void beforeWriteToDisk(ByteBuffer in,
-      HFileBlockEncodingContext encodeCtx, BlockType blockType)
+  public int encode(KeyValue kv, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
       throws IOException {
-    if (!(encodeCtx.getClass().getName().equals(
-        HFileBlockDefaultEncodingContext.class.getName()))) {
-      throw new IOException (this.getClass().getName() + " only accepts " +
-          HFileBlockDefaultEncodingContext.class.getName() + ".");
-    }
+    int klength = kv.getKeyLength();
+    int vlength = kv.getValueLength();
 
-    HFileBlockDefaultEncodingContext defaultContext =
-        (HFileBlockDefaultEncodingContext) encodeCtx;
-    defaultContext.compressAfterEncodingWithBlockType(in.array(), blockType);
+    out.writeInt(klength);
+    out.writeInt(vlength);
+    out.write(kv.getBuffer(), kv.getKeyOffset(), klength);
+    out.write(kv.getValueArray(), kv.getValueOffset(), vlength);
+    int encodedKvSize = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+    // Write the additional tag into the stream
+    if (encodingCtx.getHFileContext().isIncludesTags()) {
+      short tagsLength = kv.getTagsLength();
+      out.writeShort(tagsLength);
+      if (tagsLength > 0) {
+        out.write(kv.getTagsArray(), kv.getTagsOffset(), tagsLength);
+      }
+      encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
+    }
+    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
+      WritableUtils.writeVLong(out, kv.getMvccVersion());
+      encodedKvSize += WritableUtils.getVIntSize(kv.getMvccVersion());
+    }
+    return encodedKvSize;
   }
 
   @Override
@@ -88,4 +102,15 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder {
   public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext meta) {
     return new HFileBlockDefaultDecodingContext(meta);
   }
+
+  @Override
+  public void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
+      throws IOException {
+  }
+
+  @Override
+  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
+      byte[] uncompressedBytesWithHeader, BlockType blockType) throws IOException {
+    encodingCtx.postEncoding(BlockType.DATA);
+  }
 }