You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/23 23:58:44 UTC

svn commit: r1401499 [1/2] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/encoding/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hado...

Author: mbautin
Date: Tue Oct 23 21:58:43 2012
New Revision: 1401499

URL: http://svn.apache.org/viewvc?rev=1401499&view=rev
Log:
[jira] [HBASE-6597] [89-fb] Incremental Block Encoding

Author: mbautin

Summary:
Created incremental encoding class. HFileBlock.Writer now includes methods to support incremental encoding of data. Removed direct access to this class's data stream. Key/value pairs should be added using a DataBlockEncoder.Writer.

Changed enum DataEncoding.NONE to use the CopyKeyDataBlockEncoder. This avoids code duplication.

Test Plan:
Added new test file io/hfile/TestIncrementalEncoding.java.

Reviewers: kannan

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D554523

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java
      - copied, changed from r1401280, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestIncrementalEncoding.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Oct 23 21:58:43 2012
@@ -2177,4 +2177,15 @@ public class KeyValue implements Writabl
     out.writeInt(this.length);
     out.write(this.bytes, this.offset, this.length);
   }
-}
\ No newline at end of file
+
+  /**
+   * Returns the size of a key/value pair in bytes
+   * @param keyLength length of the key in bytes
+   * @param valueLength length of the value in bytes
+   * @return key/value pair size in bytes
+   */
+  public static int getKVSize(final int keyLength,
+      final int valueLength) {
+    return ROW_OFFSET + keyLength + valueLength;
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -35,12 +36,154 @@ abstract class BufferedDataBlockEncoder 
 
   private static int INITIAL_KEY_BUFFER_SIZE = 512;
 
+  protected static int getCommonPrefixLength(byte[] a, int aOffset, int aLength,
+                                             byte[] b, int bOffset, int bLength) {
+    if (a == null || b == null) {
+      return 0;
+    }
+
+    int common = 0;
+    int minLength = Math.min(aLength, bLength);
+    while (common < minLength && a[aOffset + common] == b[bOffset + common]) {
+      common++;
+    }
+    return common;
+  }
+
   @Override
-  public ByteBuffer decodeKeyValues(DataInputStream source,
-      boolean includesMemstoreTS) throws IOException {
-    return decodeKeyValues(source, 0, 0, includesMemstoreTS);
+  public void encodeKeyValues(DataOutputStream out,
+      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
+    in.rewind();
+    if (shouldWriteUnencodedLength()) {
+      out.writeInt(in.remaining());
+    }
+    BufferedEncodedWriter writer = createWriter(out, includesMemstoreTS);
+    byte[] inputBytes = in.array();
+    int inputOffset = in.arrayOffset();
+
+    while (in.hasRemaining()) {
+      int keyLength = in.getInt();
+      int valueLength = in.getInt();
+      int inputPos = in.position();  // This is the buffer position after key/value length.
+      int keyOffset = inputOffset + inputPos;
+      int valueOffset = keyOffset + keyLength;
+
+      writer.updateInitial(inputBytes, keyOffset, keyLength, inputBytes, valueOffset, valueLength);
+      in.position(inputPos + keyLength + valueLength);
+
+      long memstoreTS = 0;
+      if (includesMemstoreTS) {
+        memstoreTS = ByteBufferUtils.readVLong(in);
+      }
+      writer.finishAddingKeyValue(memstoreTS, inputBytes, keyOffset, keyLength, inputBytes,
+          valueOffset, valueLength);
+    }
+  }
+
+  abstract static class BufferedEncodedWriter<STATE extends EncodingState>
+      implements EncodedWriter {
+
+    protected int unencodedLength;
+    protected final DataOutputStream out;
+    protected final boolean includesMemstoreTS;
+
+    protected STATE currentState;
+    protected STATE prevState;
+
+    /**
+     * Starts updating the writer with a new key/value pair. Unlike {@link #update(long, byte[],
+     * int, int, byte[], int, int)}, does not take a memstore timestamp.
+     */
+    protected abstract void updateInitial(final byte[] key,
+        final int keyOffset, final int keyLength, final byte[] value,
+        final int valueOffset, final int valueLength) throws IOException;
+
+    @Override
+    public void update(final long memstoreTS, final byte[] key,
+        final int keyOffset, final int keyLength, final byte[] value,
+        final int valueOffset, final int valueLength) throws IOException {
+      updateInitial(key, keyOffset, keyLength, value, valueOffset, valueLength);
+      finishAddingKeyValue(memstoreTS, key, keyOffset, keyLength, value, valueOffset, valueLength);
+    }
+
+    protected void finishAddingKeyValue(long memstoreTS, byte[] key, int keyOffset, int keyLength,
+        byte[] value, int valueOffset, int valueLength) throws IOException {
+      if (this.includesMemstoreTS) {
+        WritableUtils.writeVLong(this.out, memstoreTS);
+        unencodedLength += WritableUtils.getVIntSize(memstoreTS);
+      }
+      this.unencodedLength += KeyValue.getKVSize(keyLength, valueLength);
+
+      // Encoder state management.
+      // state may be null for the no-op encoder.
+      if (currentState != null) {
+        if (prevState == null) {
+          // Initially prevState is null, let us initialize it.
+          prevState = currentState;
+          currentState = createState();
+        } else {
+          // Both previous and current states exist. Swap them.
+          STATE tmp = currentState;
+          currentState = prevState;
+          prevState = tmp;
+        }
+
+        prevState.key = key;
+        prevState.value = value;
+        prevState.keyOffset = keyOffset;
+        prevState.valueOffset = valueOffset;
+        prevState.keyLength = keyLength;
+        prevState.valueLength = valueLength;
+      }
+    }
+
+    /**
+     * Forces writer to move all encoded data to a single stream and for all
+     * key/value pairs to agree on including the memstore timestamp
+     */
+    public BufferedEncodedWriter(DataOutputStream out, boolean includesMemstoreTS)
+        throws IOException {
+      Preconditions.checkNotNull(out);
+      this.unencodedLength = 0;
+      this.out = out;
+      this.includesMemstoreTS = includesMemstoreTS;
+      currentState = createState();
+      if (currentState == null &&
+          (Class) getClass() != CopyKeyDataBlockEncoder.UnencodedWriter.class) {
+        throw new NullPointerException("Encoder state is null for " + getClass().getName());
+      }
+    }
+
+    /**
+     * Reserves space for necessary metadata, such as unencoded size, in the encoded block. Called
+     * before we start writing encoded data.
+     */
+    @Override
+    public void reserveMetadataSpace() throws IOException {
+      out.writeInt(0);
+    }
+
+    /**
+     * @param data a byte array containing encoded data
+     * @param offset offset of encoded data in the given array
+     * @param length length of encoded data in the given array
+     * @return true if the resulting block is stored in the "encoded block" format
+     */
+    @Override
+    public boolean finishEncoding(byte[] data, final int offset, final int length)
+        throws IOException {
+      // Add unencoded length to front of array. This only happens for encoded blocks.
+      ByteBufferUtils.putInt(data, offset, length, this.unencodedLength);
+      return true;
+    }
+
+    abstract STATE createState();
   }
 
+  @Override
+  public abstract BufferedEncodedWriter createWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException;
+
   protected static class SeekerState {
     protected int valueOffset = -1;
     protected int keyLength;
@@ -299,4 +442,14 @@ abstract class BufferedDataBlockEncoder 
     }
   }
 
+  /** Whether unencoded data length should be stored in the beginning of an encoded block */
+  boolean shouldWriteUnencodedLength() {
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java Tue Oct 23 21:58:43 2012
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.encoding;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-
-/**
- * Stores the state of data block encoder at the beginning of new key.
- */
-class CompressionState {
-  int keyLength;
-  int valueLength;
-
-  short rowLength;
-  int prevOffset = FIRST_KEY;
-  byte familyLength;
-  int qualifierLength;
-  byte type;
-
-  private final static int FIRST_KEY = -1;
-
-  boolean isFirst() {
-    return prevOffset == FIRST_KEY;
-  }
-
-  /**
-   * Analyze the key and fill the state.
-   * Uses mark() and reset() in ByteBuffer.
-   * @param in Buffer at the position where key starts
-   * @param keyLength Length of key in bytes
-   * @param valueLength Length of values in bytes
-   */
-  void readKey(ByteBuffer in, int keyLength, int valueLength) {
-    readKey(in, keyLength, valueLength, 0, null);
-  }
-
-  /**
-   * Analyze the key and fill the state assuming we know previous state.
-   * Uses mark() and reset() in ByteBuffer to avoid moving the position.
-   * <p>
-   * This method overrides all the fields of this instance, except
-   * {@link #prevOffset}, which is usually manipulated directly by encoders
-   * and decoders.
-   * @param in Buffer at the position where key starts
-   * @param keyLength Length of key in bytes
-   * @param valueLength Length of values in bytes
-   * @param commonPrefix how many first bytes are common with previous KeyValue
-   * @param previousState State from previous KeyValue
-   */
-  void readKey(ByteBuffer in, int keyLength, int valueLength,
-      int commonPrefix, CompressionState previousState) {
-    this.keyLength = keyLength;
-    this.valueLength = valueLength;
-
-    // fill the state
-    in.mark(); // mark beginning of key
-
-    if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
-      rowLength = in.getShort();
-      ByteBufferUtils.skip(in, rowLength);
-
-      familyLength = in.get();
-
-      qualifierLength = keyLength - rowLength - familyLength -
-          KeyValue.KEY_INFRASTRUCTURE_SIZE;
-      ByteBufferUtils.skip(in, familyLength + qualifierLength);
-    } else {
-      rowLength = previousState.rowLength;
-      familyLength = previousState.familyLength;
-      qualifierLength = previousState.qualifierLength +
-          keyLength - previousState.keyLength;
-      ByteBufferUtils.skip(in, (KeyValue.ROW_LENGTH_SIZE +
-          KeyValue.FAMILY_LENGTH_SIZE) +
-          rowLength + familyLength + qualifierLength);
-    }
-
-    readTimestamp(in);
-
-    type = in.get();
-
-    in.reset();
-  }
-
-  protected void readTimestamp(ByteBuffer in) {
-    // used in subclasses to add timestamp to state
-    ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_SIZE);
-  }
-
-  void copyFrom(CompressionState state) {
-    keyLength = state.keyLength;
-    valueLength = state.valueLength;
-
-    rowLength = state.rowLength;
-    prevOffset = state.prevOffset;
-    familyLength = state.familyLength;
-    qualifierLength = state.qualifierLength;
-    type = state.type;
-  }
-}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -26,45 +26,84 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.io.RawComparator;
 
 /**
- * Just copy data, do not do any kind of compression. Use for comparison and
- * benchmarking.
+ * This "encoder" implementation is used for the case when encoding is turned
+ * off. Unencoded block headers are exactly the same as they were before the
+ * data block encoding feature was introduced (no special fields for unencoded
+ * size or encoding type).
  */
 public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
   @Override
   public void encodeKeyValues(DataOutputStream out,
       ByteBuffer in, boolean includesMemstoreTS) throws IOException {
     in.rewind();
-    ByteBufferUtils.putInt(out, in.limit());
-    ByteBufferUtils.moveBufferToStream(out, in, in.limit());
+    ByteBufferUtils.moveBufferToStream(out, in, in.remaining());
   }
 
   @Override
   public ByteBuffer decodeKeyValues(DataInputStream source,
-      int preserveHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+      int preserveHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
       throws IOException {
-    int decompressedSize = source.readInt();
-    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
+    // Encoded size and decoded size are the same here
+    ByteBuffer buffer = ByteBuffer.allocate(totalEncodedSize +
         preserveHeaderLength);
     buffer.position(preserveHeaderLength);
-    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, decompressedSize);
+    ByteBufferUtils.copyFromStreamToBuffer(buffer, source, totalEncodedSize);
 
     return buffer;
   }
 
   @Override
   public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
-    int keyLength = block.getInt(Bytes.SIZEOF_INT);
+    int keyLength = block.getInt(0);
     return ByteBuffer.wrap(block.array(),
-        block.arrayOffset() + 3 * Bytes.SIZEOF_INT, keyLength).slice();
+        block.arrayOffset() + 2 * Bytes.SIZEOF_INT, keyLength).slice();
   }
 
-
   @Override
   public String toString() {
     return CopyKeyDataBlockEncoder.class.getSimpleName();
   }
 
   @Override
+  public UnencodedWriter createWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException {
+    return new UnencodedWriter(out, includesMemstoreTS);
+  }
+
+  static class UnencodedWriter extends BufferedEncodedWriter<EncodingState> {
+    public UnencodedWriter(DataOutputStream out, boolean includesMemstoreTS)
+        throws IOException {
+      super(out, includesMemstoreTS);
+    }
+
+    @Override
+    public void updateInitial(final byte[] key,
+        final int keyOffset, final int keyLength, final byte[] value,
+        final int valueOffset, final int valueLength) throws IOException {
+      // Write out the unencoded data
+      out.writeInt(keyLength);
+      out.writeInt(valueLength);
+      this.out.write(key, keyOffset, keyLength);
+      this.out.write(value, valueOffset, valueLength);
+    }
+
+    @Override
+    public void reserveMetadataSpace() {
+      // We are not storing unencoded block length for unencoded blocks.
+    }
+
+    @Override
+    public boolean finishEncoding(byte[] data, int offset, int length) throws IOException {
+      return false;  // no encoding
+    }
+
+    @Override
+    EncodingState createState() {
+      return null;
+    }
+  }
+
+  @Override
   public EncodedSeeker createSeeker(RawComparator<byte[]> comparator,
       final boolean includesMemstoreTS) {
     return new BufferedEncodedSeeker<SeekerState>(comparator) {
@@ -86,10 +125,15 @@ public class CopyKeyDataBlockEncoder ext
 
       @Override
       protected void decodeFirst() {
-        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
         current.lastCommonPrefix = 0;
         decodeNext();
       }
     };
   }
+
+  @Override
+  boolean shouldWriteUnencodedLength() {
+    // We don't store unencoded length for unencoded blocks.
+    return false;
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -48,27 +48,16 @@ public interface DataBlockEncoder {
 
   /**
    * Decode.
-   * @param source Encoded stream of KeyValues.
-   * @param includesMemstoreTS true if including memstore timestamp after every
-   *          key-value pair
-   * @return decoded block of KeyValues.
-   * @throws IOException If there is an error in source.
-   */
-  public ByteBuffer decodeKeyValues(DataInputStream source,
-      boolean includesMemstoreTS) throws IOException;
-
-  /**
-   * Decode.
    * @param source encoded stream of KeyValues.
    * @param allocateHeaderLength allocate this many bytes for the header.
-   * @param skipLastBytes Do not copy n last bytes.
    * @param includesMemstoreTS true if including memstore timestamp after every
    *          key-value pair
+   * @param totalEncodedSize the total size of the encoded data to read        
    * @return decoded block of KeyValues.
    * @throws IOException If there is an error in source.
    */
   public ByteBuffer decodeKeyValues(DataInputStream source,
-      int allocateHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+      int allocateHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
       throws IOException;
 
   /**
@@ -92,6 +81,54 @@ public interface DataBlockEncoder {
       boolean includesMemstoreTS);
 
   /**
+   * Create an incremental writer
+   * @param out Where to write encoded data
+   * @param includesMemstoreTS True if including memstore timestamp after every
+   *          key-value pair
+   * @return
+   */
+  public EncodedWriter createWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException;
+
+  /**
+   * An interface for performing incremental encoding
+   */
+  public static interface EncodedWriter {
+    /**
+     * Sets next key to insert for the writer and updates the encoded data
+     * in the stream if necessary.
+     * @param memstoreTS Current memstore timestamp
+     * @param key Source of key bytes
+     * @param keyOffset Offset of initial byte in key array
+     * @param keyLength Length of key bytes in key array
+     * @param value Source of value bytes
+     * @param valueOffset Offset of initial byte in value array
+     * @param valueLength Length of value bytes in value array
+     * @throws IOException
+     */
+    public void update(final long memstoreTS, final byte[] key,
+        final int keyOffset, final int keyLength, final byte[] value,
+        final int valueOffset, final int valueLength) throws IOException;
+
+    /**
+     * Completes the encoding process on the given byte array
+     * @param data The encoded stream as a byte array
+     * @param offset Offset of initial byte in data array
+     * @param length Length of the portion of the array containing encoded data
+     * @return True if encoding was performed
+     * @throws IOException
+     */
+    public boolean finishEncoding(byte[] data, final int offset,
+        final int length) throws IOException;
+
+    /**
+     * Called before writing anything to the stream to reserve space for the necessary metadata
+     * such as unencoded length.
+     */
+    void reserveMetadataSpace() throws IOException;
+  }
+
+  /**
    * An interface which enable to seek while underlying data is encoded.
    *
    * It works on one HFileBlock, but it is reusable. See

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java Tue Oct 23 21:58:43 2012
@@ -34,7 +34,18 @@ import org.apache.hadoop.hbase.util.Byte
 public enum DataBlockEncoding {
 
   /** Disable data block encoding. */
-  NONE(0, null),
+  NONE(0, new CopyKeyDataBlockEncoder()) {
+    @Override
+    public int inBlockMetadataSize() {
+      return 0; 
+    }
+
+    @Override
+    public int encodingIdSize() {
+      return 0;
+    }
+  },
+
   // id 1 is reserved for the BITSET algorithm to be added later
   PREFIX(2, new PrefixKeyDeltaEncoder()),
   DIFF(3, new DiffKeyDeltaEncoder()),
@@ -177,4 +188,16 @@ public enum DataBlockEncoding {
     return idToEncoding.get(dataBlockEncodingId);
   }
 
+  /**
+   * Size of metadata inside the encoded block. This metadata is not considered part of the block
+   * header, but is part of the block payload. It does not include the encoding id.
+   */
+  public int inBlockMetadataSize() {
+    return Bytes.SIZEOF_INT;  // unencoded size
+  }
+
+  public int encodingIdSize() {
+    return ID_SIZE;
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java Tue Oct 23 21:58:43 2012
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -28,9 +29,9 @@ import org.apache.hadoop.io.RawComparato
 
 /**
  * Compress using:
- * - store size of common prefix
+ * - store the size of common prefix
  * - save column family once, it is same within HFile
- * - use integer compression for key, value and prefix (7-bit encoding)
+ * - use integer compression for key, value and prefix lengths (7-bit encoding)
  * - use bits to avoid duplication key length, value length
  *   and type if it same as previous
  * - store in 3 bits length of timestamp field
@@ -56,7 +57,7 @@ public class DiffKeyDeltaEncoder extends
   static final int SHIFT_TIMESTAMP_LENGTH = 4;
   static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
 
-  protected static class DiffCompressionState extends CompressionState {
+  protected static class DiffEncodingState extends EncodingState {
     long timestamp;
     byte[] familyNameWithSize;
 
@@ -66,141 +67,16 @@ public class DiffKeyDeltaEncoder extends
     }
 
     @Override
-    void copyFrom(CompressionState state) {
+    void copyFrom(EncodingState state) {
       super.copyFrom(state);
-      DiffCompressionState state2 = (DiffCompressionState) state;
+      DiffEncodingState state2 = (DiffEncodingState) state;
       timestamp = state2.timestamp;
     }
   }
 
-  private void compressSingleKeyValue(DiffCompressionState previousState,
-      DiffCompressionState currentState, DataOutputStream out,
-      ByteBuffer in) throws IOException {
-    byte flag = 0;
-    int kvPos = in.position();
-    int keyLength = in.getInt();
-    int valueLength = in.getInt();
-
-    long timestamp;
-    long diffTimestamp = 0;
-    int diffTimestampFitsInBytes = 0;
-
-    int commonPrefix;
-
-    int timestampFitsInBytes;
-
-    if (previousState.isFirst()) {
-      currentState.readKey(in, keyLength, valueLength);
-      currentState.prevOffset = kvPos;
-      timestamp = currentState.timestamp;
-      if (timestamp < 0) {
-        flag |= FLAG_TIMESTAMP_SIGN;
-        timestamp = -timestamp;
-      }
-      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
-      flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
-      commonPrefix = 0;
-
-      // put column family
-      in.mark();
-      ByteBufferUtils.skip(in, currentState.rowLength
-          + KeyValue.ROW_LENGTH_SIZE);
-      ByteBufferUtils.moveBufferToStream(out, in, currentState.familyLength
-          + KeyValue.FAMILY_LENGTH_SIZE);
-      in.reset();
-    } else {
-      // find a common prefix and skip it
-      commonPrefix =
-          ByteBufferUtils.findCommonPrefix(in, in.position(),
-              previousState.prevOffset + KeyValue.ROW_OFFSET, keyLength
-                  - KeyValue.TIMESTAMP_TYPE_SIZE);
-      // don't compress timestamp and type using prefix
-
-      currentState.readKey(in, keyLength, valueLength,
-          commonPrefix, previousState);
-      currentState.prevOffset = kvPos;
-      timestamp = currentState.timestamp;
-      boolean negativeTimestamp = timestamp < 0;
-      if (negativeTimestamp) {
-        timestamp = -timestamp;
-      }
-      timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
-
-      if (keyLength == previousState.keyLength) {
-        flag |= FLAG_SAME_KEY_LENGTH;
-      }
-      if (valueLength == previousState.valueLength) {
-        flag |= FLAG_SAME_VALUE_LENGTH;
-      }
-      if (currentState.type == previousState.type) {
-        flag |= FLAG_SAME_TYPE;
-      }
-
-      // encode timestamp
-      diffTimestamp = previousState.timestamp - currentState.timestamp;
-      boolean minusDiffTimestamp = diffTimestamp < 0;
-      if (minusDiffTimestamp) {
-        diffTimestamp = -diffTimestamp;
-      }
-      diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
-      if (diffTimestampFitsInBytes < timestampFitsInBytes) {
-        flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
-        flag |= FLAG_TIMESTAMP_IS_DIFF;
-        if (minusDiffTimestamp) {
-          flag |= FLAG_TIMESTAMP_SIGN;
-        }
-      } else {
-        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
-        if (negativeTimestamp) {
-          flag |= FLAG_TIMESTAMP_SIGN;
-        }
-      }
-    }
-
-    out.write(flag);
-
-    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
-      ByteBufferUtils.putCompressedInt(out, keyLength);
-    }
-    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
-      ByteBufferUtils.putCompressedInt(out, valueLength);
-    }
-
-    ByteBufferUtils.putCompressedInt(out, commonPrefix);
-    ByteBufferUtils.skip(in, commonPrefix);
-
-    if (previousState.isFirst() ||
-        commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
-      int restRowLength =
-          currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
-      ByteBufferUtils.moveBufferToStream(out, in, restRowLength);
-      ByteBufferUtils.skip(in, currentState.familyLength +
-          KeyValue.FAMILY_LENGTH_SIZE);
-      ByteBufferUtils.moveBufferToStream(out, in, currentState.qualifierLength);
-    } else {
-      ByteBufferUtils.moveBufferToStream(out, in,
-          keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
-    }
-
-    if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
-      ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
-    } else {
-      ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
-    }
-
-    if ((flag & FLAG_SAME_TYPE) == 0) {
-      out.write(currentState.type);
-    }
-    ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_TYPE_SIZE);
-
-    ByteBufferUtils.moveBufferToStream(out, in, valueLength);
-  }
-
   private void uncompressSingleKeyValue(DataInputStream source,
-      ByteBuffer buffer,
-      DiffCompressionState state)
-          throws IOException, EncoderBufferTooSmallException {
+      ByteBuffer buffer, DiffEncodingState state)
+      throws IOException, EncoderBufferTooSmallException {
     // read the column family at the beginning
     if (state.isFirst()) {
       state.familyLength = source.readByte();
@@ -229,16 +105,14 @@ public class DiffKeyDeltaEncoder extends
     }
     int commonPrefix = ByteBufferUtils.readCompressedInt(source);
 
-    // create KeyValue buffer and fill it prefix
     int keyOffset = buffer.position();
-    ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength
-        + KeyValue.ROW_OFFSET);
+    ByteBufferUtils.ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
     buffer.putInt(keyLength);
     buffer.putInt(valueLength);
 
     // copy common from previous key
     if (commonPrefix > 0) {
-      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
+      ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.keyOffset
           + KeyValue.ROW_OFFSET, commonPrefix);
     }
 
@@ -305,49 +179,31 @@ public class DiffKeyDeltaEncoder extends
     // copy value part
     ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
 
-    state.keyLength = keyLength;
-    state.valueLength = valueLength;
-    state.prevOffset = keyOffset;
+    state.keyOffset = keyOffset;
     state.timestamp = timestamp;
     state.type = type;
-    // state.qualifier is unused
-  }
-
-  @Override
-  public void encodeKeyValues(DataOutputStream out,
-      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
-    in.rewind();
-    ByteBufferUtils.putInt(out, in.limit());
-    DiffCompressionState previousState = new DiffCompressionState();
-    DiffCompressionState currentState = new DiffCompressionState();
-    while (in.hasRemaining()) {
-      compressSingleKeyValue(previousState, currentState,
-          out, in);
-      afterEncodingKeyValue(in, out, includesMemstoreTS);
-
-      // swap previousState <-> currentState
-      DiffCompressionState tmp = previousState;
-      previousState = currentState;
-      currentState = tmp;
-    }
+    state.keyLength = keyLength;
+    state.valueLength = valueLength;
   }
 
   @Override
   public ByteBuffer decodeKeyValues(DataInputStream source,
-      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+      int allocHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
       throws IOException {
+    int skipLastBytes = source.available() - totalEncodedSize;
+    Preconditions.checkState(skipLastBytes >= 0, "Requested to skip a negative number of bytes");
     int decompressedSize = source.readInt();
     ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
         allocHeaderLength);
     buffer.position(allocHeaderLength);
-    DiffCompressionState state = new DiffCompressionState();
+    DiffEncodingState state = new DiffEncodingState();
     while (source.available() > skipLastBytes) {
       uncompressSingleKeyValue(source, buffer, state);
       afterDecodingKeyValue(source, buffer, includesMemstoreTS);
     }
 
     if (source.available() != skipLastBytes) {
-      throw new IllegalStateException("Read too much bytes.");
+      throw new IOException("Read too many bytes");
     }
 
     return buffer;
@@ -402,8 +258,143 @@ public class DiffKeyDeltaEncoder extends
   }
 
   @Override
-  public String toString() {
-    return DiffKeyDeltaEncoder.class.getSimpleName();
+  public DiffKeyDeltaEncoderWriter createWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException {
+    return new DiffKeyDeltaEncoderWriter(out, includesMemstoreTS);
+  }
+
+  /**
+   * A writer that incrementally performs Fast Diff Delta Encoding
+   */
+  private static class DiffKeyDeltaEncoderWriter
+      extends BufferedEncodedWriter<DiffEncodingState> {
+
+    public DiffKeyDeltaEncoderWriter(DataOutputStream out,
+        boolean includesMemstoreTS) throws IOException {
+      super(out, includesMemstoreTS);
+    }
+
+    @Override
+    DiffEncodingState createState() {
+      return new DiffEncodingState();
+    }
+
+    @Override
+    public void updateInitial(final byte[] key,
+        final int keyOffset, final int keyLength, final byte[] value,
+        final int valueOffset, final int valueLength) throws IOException {
+      ByteBuffer keyBuffer = ByteBuffer.wrap(key, keyOffset, keyLength);
+      long timestamp;
+      long diffTimestamp = 0;
+      int diffTimestampFitsInBytes = 0;
+      byte flag = 0;
+      int commonPrefix;
+      int timestampFitsInBytes;
+
+      if (this.prevState == null) {
+        currentState.readKey(keyBuffer, keyLength, valueLength);
+        timestamp = currentState.timestamp;
+        if (timestamp < 0) {
+          flag |= FLAG_TIMESTAMP_SIGN;
+          timestamp = -timestamp;
+        }
+        timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+        flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+        commonPrefix = 0;
+
+        // put column family
+        this.out.write(key, keyOffset + currentState.rowLength
+            + KeyValue.ROW_LENGTH_SIZE, currentState.familyLength
+            + KeyValue.FAMILY_LENGTH_SIZE);
+      } else {
+        // find a common prefix and skip it
+        // don't compress timestamp and type using this prefix
+        commonPrefix = getCommonPrefixLength(key, keyOffset, keyLength -
+            KeyValue.TIMESTAMP_TYPE_SIZE, this.prevState.key,
+            this.prevState.keyOffset, this.prevState.keyLength -
+            KeyValue.TIMESTAMP_TYPE_SIZE);
+
+        currentState.readKey(keyBuffer, keyLength, valueLength,
+            commonPrefix, this.prevState);
+        timestamp = currentState.timestamp;
+        boolean negativeTimestamp = timestamp < 0;
+        if (negativeTimestamp) {
+          timestamp = -timestamp;
+        }
+        timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
+
+        if (keyLength == this.prevState.keyLength) {
+          flag |= FLAG_SAME_KEY_LENGTH;
+        }
+        if (valueLength == this.prevState.valueLength) {
+          flag |= FLAG_SAME_VALUE_LENGTH;
+        }
+        if (currentState.type == this.prevState.type) {
+          flag |= FLAG_SAME_TYPE;
+        }
+
+        // encode timestamp
+        diffTimestamp = this.prevState.timestamp - currentState.timestamp;
+        boolean negativeDiffTimestamp = diffTimestamp < 0;
+        if (negativeDiffTimestamp) {
+          diffTimestamp = -diffTimestamp;
+        }
+        diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
+        if (diffTimestampFitsInBytes < timestampFitsInBytes) {
+          flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+          flag |= FLAG_TIMESTAMP_IS_DIFF;
+          if (negativeDiffTimestamp) {
+            flag |= FLAG_TIMESTAMP_SIGN;
+          }
+        } else {
+          flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
+          if (negativeTimestamp) {
+            flag |= FLAG_TIMESTAMP_SIGN;
+          }
+        }
+      }
+
+      this.out.write(flag);
+
+      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(this.out, keyLength);
+      }
+      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+        ByteBufferUtils.putCompressedInt(this.out, valueLength);
+      }
+
+      ByteBufferUtils.putCompressedInt(this.out, commonPrefix);
+
+      if ((this.prevState == null) ||
+          commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+        int restRowLength =
+            currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
+        this.out.write(key, keyOffset + commonPrefix, restRowLength);
+        this.out.write(key, keyOffset + commonPrefix + restRowLength +
+            currentState.familyLength + KeyValue.FAMILY_LENGTH_SIZE,
+            currentState.qualifierLength);
+      } else {
+        this.out.write(key, keyOffset + commonPrefix, keyLength -
+            commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE);
+      }
+
+      if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
+        ByteBufferUtils.putLong(this.out, timestamp, timestampFitsInBytes);
+      } else {
+        ByteBufferUtils.putLong(this.out, diffTimestamp,
+            diffTimestampFitsInBytes);
+      }
+
+      if ((flag & FLAG_SAME_TYPE) == 0) {
+        this.out.write(currentState.type);
+      }
+
+      this.out.write(value, valueOffset, valueLength);
+
+      this.currentState.key = key;
+      this.currentState.keyOffset = keyOffset;
+    }
   }
 
   protected static class DiffSeekerState extends SeekerState {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java Tue Oct 23 21:58:43 2012
@@ -88,7 +88,7 @@ public class EncodedDataBlock {
         if (decompressedData == null) {
           try {
             decompressedData = dataBlockEncoder.decodeKeyValues(
-                dis, includesMemstoreTS);
+                dis, 0, includesMemstoreTS, dis.available()); 
           } catch (IOException e) {
             throw new RuntimeException("Problem with data block encoder, " +
                 "most likely it requested more bytes than are available.", e);

Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java (from r1401280, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java&r1=1401280&r2=1401499&rev=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/CompressionState.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodingState.java Tue Oct 23 21:58:43 2012
@@ -24,12 +24,16 @@ import org.apache.hadoop.hbase.util.Byte
 /**
  * Stores the state of data block encoder at the beginning of new key.
  */
-class CompressionState {
+class EncodingState {
+  byte[] key;
+  byte[] value;
+
   int keyLength;
   int valueLength;
+  int valueOffset = 0;
 
   short rowLength;
-  int prevOffset = FIRST_KEY;
+  int keyOffset = FIRST_KEY;
   byte familyLength;
   int qualifierLength;
   byte type;
@@ -37,7 +41,7 @@ class CompressionState {
   private final static int FIRST_KEY = -1;
 
   boolean isFirst() {
-    return prevOffset == FIRST_KEY;
+    return keyOffset == FIRST_KEY;
   }
 
   /**
@@ -51,12 +55,12 @@ class CompressionState {
     readKey(in, keyLength, valueLength, 0, null);
   }
 
-  /**
+  /** 
    * Analyze the key and fill the state assuming we know previous state.
    * Uses mark() and reset() in ByteBuffer to avoid moving the position.
    * <p>
    * This method overrides all the fields of this instance, except
-   * {@link #prevOffset}, which is usually manipulated directly by encoders
+   * {@link #keyOffset}, which is usually manipulated directly by encoders
    * and decoders.
    * @param in Buffer at the position where key starts
    * @param keyLength Length of key in bytes
@@ -65,7 +69,7 @@ class CompressionState {
    * @param previousState State from previous KeyValue
    */
   void readKey(ByteBuffer in, int keyLength, int valueLength,
-      int commonPrefix, CompressionState previousState) {
+      int commonPrefix, EncodingState previousState) {
     this.keyLength = keyLength;
     this.valueLength = valueLength;
 
@@ -103,12 +107,12 @@ class CompressionState {
     ByteBufferUtils.skip(in, KeyValue.TIMESTAMP_SIZE);
   }
 
-  void copyFrom(CompressionState state) {
+  void copyFrom(EncodingState state) {
     keyLength = state.keyLength;
     valueLength = state.valueLength;
 
     rowLength = state.rowLength;
-    prevOffset = state.prevOffset;
+    keyOffset = state.keyOffset;
     familyLength = state.familyLength;
     qualifierLength = state.qualifierLength;
     type = state.type;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java Tue Oct 23 21:58:43 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.encod
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.KeyValue;
@@ -33,7 +32,7 @@ import org.apache.hadoop.io.RawComparato
  * Compress using:
  * - store size of common prefix
  * - save column family once in the first KeyValue
- * - use integer compression for key, value and prefix (7-bit encoding)
+ * - use integer compression for key, value and prefix lengths (7-bit encoding)
  * - use bits to avoid duplication key length, value length
  *   and type if it same as previous
  * - store in 3 bits length of prefix timestamp
@@ -53,14 +52,14 @@ import org.apache.hadoop.io.RawComparato
  *
  */
 public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
-  final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
-  final int SHIFT_TIMESTAMP_LENGTH = 0;
-  final int FLAG_SAME_KEY_LENGTH = 1 << 3;
-  final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
-  final int FLAG_SAME_TYPE = 1 << 5;
-  final int FLAG_SAME_VALUE = 1 << 6;
+  final static int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
+  final static int SHIFT_TIMESTAMP_LENGTH = 0;
+  final static int FLAG_SAME_KEY_LENGTH = 1 << 3;
+  final static int FLAG_SAME_VALUE_LENGTH = 1 << 4;
+  final static int FLAG_SAME_TYPE = 1 << 5;
+  final static int FLAG_SAME_VALUE = 1 << 6;
 
-  private static class FastDiffCompressionState extends CompressionState {
+  private static class FastDiffEncodingState extends EncodingState {
     byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
     int prevTimestampOffset;
 
@@ -70,9 +69,9 @@ public class FastDiffDeltaEncoder extend
     }
 
     @Override
-    void copyFrom(CompressionState state) {
+    void copyFrom(EncodingState state) {
       super.copyFrom(state);
-      FastDiffCompressionState state2 = (FastDiffCompressionState) state;
+      FastDiffEncodingState state2 = (FastDiffEncodingState) state;
       System.arraycopy(state2.timestamp, 0, timestamp, 0,
           KeyValue.TIMESTAMP_SIZE);
       prevTimestampOffset = state2.prevTimestampOffset;
@@ -82,7 +81,7 @@ public class FastDiffDeltaEncoder extend
      * Copies the first key/value from the given stream, and initializes
      * decompression state based on it. Assumes that we have already read key
      * and value lengths. Does not set {@link #qualifierLength} (not used by
-     * decompression) or {@link #prevOffset} (set by the calle afterwards).
+     * decompression) or {@link #keyOffset} (set by the caller afterwards).
      */
     private void decompressFirstKV(ByteBuffer out, DataInputStream in)
         throws IOException {
@@ -100,111 +99,8 @@ public class FastDiffDeltaEncoder extend
 
   }
 
-  private void compressSingleKeyValue(
-        FastDiffCompressionState previousState,
-        FastDiffCompressionState currentState,
-        OutputStream out, ByteBuffer in) throws IOException {
-    currentState.prevOffset = in.position();
-    int keyLength = in.getInt();
-    int valueOffset =
-        currentState.prevOffset + keyLength + KeyValue.ROW_OFFSET;
-    int valueLength = in.getInt();
-    byte flag = 0;
-
-    if (previousState.isFirst()) {
-      // copy the key, there is no common prefix with none
-      out.write(flag);
-      ByteBufferUtils.putCompressedInt(out, keyLength);
-      ByteBufferUtils.putCompressedInt(out, valueLength);
-      ByteBufferUtils.putCompressedInt(out, 0);
-
-      currentState.readKey(in, keyLength, valueLength);
-
-      ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
-    } else {
-      // find a common prefix and skip it
-      int commonPrefix = ByteBufferUtils.findCommonPrefix(in, in.position(),
-          previousState.prevOffset + KeyValue.ROW_OFFSET,
-          Math.min(keyLength, previousState.keyLength) -
-          KeyValue.TIMESTAMP_TYPE_SIZE);
-
-      currentState.readKey(in, keyLength, valueLength,
-          commonPrefix, previousState);
-
-      if (keyLength == previousState.keyLength) {
-        flag |= FLAG_SAME_KEY_LENGTH;
-      }
-      if (valueLength == previousState.valueLength) {
-        flag |= FLAG_SAME_VALUE_LENGTH;
-      }
-      if (currentState.type == previousState.type) {
-        flag |= FLAG_SAME_TYPE;
-      }
-
-      int commonTimestampPrefix = findCommonTimestampPrefix(
-          currentState, previousState);
-      flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
-
-      // Check if current and previous values are the same. Compare value
-      // length first as an optimization.
-      if (valueLength == previousState.valueLength) {
-        int previousValueOffset = previousState.prevOffset
-            + previousState.keyLength + KeyValue.ROW_OFFSET;
-        if (ByteBufferUtils.arePartsEqual(in,
-                previousValueOffset, previousState.valueLength,
-                valueOffset, valueLength)) {
-          flag |= FLAG_SAME_VALUE;
-        }
-      }
-
-      out.write(flag);
-      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
-        ByteBufferUtils.putCompressedInt(out, keyLength);
-      }
-      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
-        ByteBufferUtils.putCompressedInt(out, valueLength);
-      }
-      ByteBufferUtils.putCompressedInt(out, commonPrefix);
-
-      ByteBufferUtils.skip(in, commonPrefix);
-      if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
-        // Previous and current rows are different. Copy the differing part of
-        // the row, skip the column family, and copy the qualifier.
-        ByteBufferUtils.moveBufferToStream(out, in,
-            currentState.rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix);
-        ByteBufferUtils.skip(in, currentState.familyLength +
-            KeyValue.FAMILY_LENGTH_SIZE);
-        ByteBufferUtils.moveBufferToStream(out, in,
-            currentState.qualifierLength);
-      } else {
-        // The common part includes the whole row. As the column family is the
-        // same across the whole file, it will automatically be included in the
-        // common prefix, so we need not special-case it here.
-        int restKeyLength = keyLength - commonPrefix -
-            KeyValue.TIMESTAMP_TYPE_SIZE;
-        ByteBufferUtils.moveBufferToStream(out, in, restKeyLength);
-      }
-      ByteBufferUtils.skip(in, commonTimestampPrefix);
-      ByteBufferUtils.moveBufferToStream(out, in,
-          KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
-
-      // Write the type if it is not the same as before.
-      if ((flag & FLAG_SAME_TYPE) == 0) {
-        out.write(currentState.type);
-      }
-
-      // Write the value if it is not the same as before.
-      if ((flag & FLAG_SAME_VALUE) == 0) {
-        ByteBufferUtils.copyBufferToStream(out, in, valueOffset, valueLength);
-      }
-
-      // Skip key type and value in the input buffer.
-      ByteBufferUtils.skip(in, KeyValue.TYPE_SIZE + currentState.valueLength);
-    }
-  }
-
-  private int findCommonTimestampPrefix(FastDiffCompressionState left,
-      FastDiffCompressionState right) {
+  private static int findCommonTimestampPrefix(FastDiffEncodingState left,
+      FastDiffEncodingState right) {
     int prefixTimestamp = 0;
     while (prefixTimestamp < (KeyValue.TIMESTAMP_SIZE - 1) &&
         left.timestamp[prefixTimestamp]
@@ -215,7 +111,7 @@ public class FastDiffDeltaEncoder extend
   }
 
   private void uncompressSingleKeyValue(DataInputStream source,
-      ByteBuffer out, FastDiffCompressionState state)
+      ByteBuffer out, FastDiffEncodingState state)
           throws IOException, EncoderBufferTooSmallException {
     byte flag = source.readByte();
     int prevKeyLength = state.keyLength;
@@ -241,15 +137,15 @@ public class FastDiffDeltaEncoder extend
       if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
         out.putInt(state.keyLength);
         out.putInt(state.valueLength);
-        prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
+        prevOffset = state.keyOffset + KeyValue.ROW_OFFSET;
         common = commonLength;
       } else {
         if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
-          prevOffset = state.prevOffset;
+          prevOffset = state.keyOffset;
           common = commonLength + KeyValue.ROW_OFFSET;
         } else {
           out.putInt(state.keyLength);
-          prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
+          prevOffset = state.keyOffset + KeyValue.KEY_LENGTH_SIZE;
           common = commonLength + KeyValue.KEY_LENGTH_SIZE;
         }
       }
@@ -284,7 +180,7 @@ public class FastDiffDeltaEncoder extend
 
         // copy the column family
         ByteBufferUtils.copyFromBufferToBuffer(out, out,
-            state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
+            state.keyOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE
                 + state.rowLength, state.familyLength
                 + KeyValue.FAMILY_LENGTH_SIZE);
         state.rowLength = (short) (rowWithSizeLength -
@@ -314,7 +210,7 @@ public class FastDiffDeltaEncoder extend
       if ((flag & FLAG_SAME_TYPE) != 0) {
         out.put(state.type);
         if ((flag & FLAG_SAME_VALUE) != 0) {
-          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
+          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.keyOffset +
               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
         } else {
           ByteBufferUtils.copyFromStreamToBuffer(out, source,
@@ -324,7 +220,7 @@ public class FastDiffDeltaEncoder extend
         if ((flag & FLAG_SAME_VALUE) != 0) {
           ByteBufferUtils.copyFromStreamToBuffer(out, source,
               KeyValue.TYPE_SIZE);
-          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevOffset +
+          ByteBufferUtils.copyFromBufferToBuffer(out, out, state.keyOffset +
               KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
         } else {
           ByteBufferUtils.copyFromStreamToBuffer(out, source,
@@ -337,44 +233,25 @@ public class FastDiffDeltaEncoder extend
       state.decompressFirstKV(out, source);
     }
 
-    state.prevOffset = kvPos;
-  }
-
-  @Override
-  public void encodeKeyValues(DataOutputStream out,
-      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
-    in.rewind();
-    ByteBufferUtils.putInt(out, in.limit());
-    FastDiffCompressionState previousState = new FastDiffCompressionState();
-    FastDiffCompressionState currentState = new FastDiffCompressionState();
-    while (in.hasRemaining()) {
-      compressSingleKeyValue(previousState, currentState,
-          out, in);
-      afterEncodingKeyValue(in, out, includesMemstoreTS);
-
-      // swap previousState <-> currentState
-      FastDiffCompressionState tmp = previousState;
-      previousState = currentState;
-      currentState = tmp;
-    }
+    state.keyOffset = kvPos;
   }
 
   @Override
   public ByteBuffer decodeKeyValues(DataInputStream source,
-      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
+      int allocHeaderLength, boolean includesMemstoreTS, int totalEncodedSize)
           throws IOException {
+    int skipLastBytes = source.available() - totalEncodedSize;
     int decompressedSize = source.readInt();
-    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
-        allocHeaderLength);
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocHeaderLength);
     buffer.position(allocHeaderLength);
-    FastDiffCompressionState state = new FastDiffCompressionState();
+    FastDiffEncodingState state = new FastDiffEncodingState();
     while (source.available() > skipLastBytes) {
       uncompressSingleKeyValue(source, buffer, state);
       afterDecodingKeyValue(source, buffer, includesMemstoreTS);
     }
 
     if (source.available() != skipLastBytes) {
-      throw new IllegalStateException("Read too much bytes.");
+      throw new IOException("Read too many bytes");
     }
 
     return buffer;
@@ -393,8 +270,116 @@ public class FastDiffDeltaEncoder extend
   }
 
   @Override
-  public String toString() {
-    return FastDiffDeltaEncoder.class.getSimpleName();
+  public FastDiffDeltaEncoderWriter createWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException {
+    return new FastDiffDeltaEncoderWriter(out, includesMemstoreTS);
+  }
+
+  /**
+   * A writer that incrementally performs Fast Diff Delta Encoding
+   */
+  private static class FastDiffDeltaEncoderWriter
+      extends BufferedEncodedWriter<FastDiffEncodingState> {
+    public FastDiffDeltaEncoderWriter(DataOutputStream out,
+        boolean includesMemstoreTS) throws IOException {
+      super(out, includesMemstoreTS);
+    }
+
+    @Override
+    FastDiffEncodingState createState() {
+      return new FastDiffEncodingState();
+    }
+
+    @Override
+    protected void updateInitial(byte[] key, int keyOffset, int keyLength,
+        byte[] value, int valueOffset, int valueLength) throws IOException {
+      byte flag = 0;
+      ByteBuffer keyBuffer = ByteBuffer.wrap(key, keyOffset, keyLength);
+
+      if (this.prevState == null) {
+        // The first element in the stream
+        out.write(flag);
+        ByteBufferUtils.putCompressedInt(out, keyLength);
+        ByteBufferUtils.putCompressedInt(out, valueLength);
+        ByteBufferUtils.putCompressedInt(out, 0);
+        out.write(key, keyOffset, keyLength);
+        out.write(value, valueOffset, valueLength);
+
+        // Initialize the compression state
+        currentState.readKey(keyBuffer, keyLength, valueLength);
+      } else {
+        // Find the common prefix to skip
+        int commonPrefix = getCommonPrefixLength(key, keyOffset, keyLength -
+            KeyValue.TIMESTAMP_TYPE_SIZE, this.prevState.key,
+            this.prevState.keyOffset, this.prevState.keyLength -
+            KeyValue.TIMESTAMP_TYPE_SIZE);
+
+        currentState.readKey(keyBuffer, keyLength, valueLength, commonPrefix,
+            this.prevState);
+
+        if (keyLength == this.prevState.keyLength) {
+          flag |= FLAG_SAME_KEY_LENGTH;
+        }
+        if (valueLength == this.prevState.valueLength) {
+          flag |= FLAG_SAME_VALUE_LENGTH;
+        }
+        if (currentState.type == this.prevState.type) {
+          flag |= FLAG_SAME_TYPE;
+        }
+
+        int commonTimestampPrefix = findCommonTimestampPrefix(currentState, this.prevState);
+        flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
+
+        // Check if current and previous values are the same. Compare value
+        // length first as an optimization.
+        if (valueLength == this.prevState.valueLength) {
+          if (valueLength == getCommonPrefixLength(value, valueOffset, valueLength,
+              this.prevState.value, this.prevState.valueLength,
+              this.prevState.valueLength)) {
+            // if common prefix consists of whole value length
+            flag |= FLAG_SAME_VALUE;
+          }
+        }
+
+        out.write(flag);
+        if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
+          ByteBufferUtils.putCompressedInt(out, keyLength);
+        }
+        if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
+          ByteBufferUtils.putCompressedInt(out, valueLength);
+        }
+        ByteBufferUtils.putCompressedInt(out, commonPrefix);
+
+        if (commonPrefix < currentState.rowLength + KeyValue.ROW_LENGTH_SIZE) {
+          // Previous and current rows are different. Copy the differing part
+          // of the row, skip the column family, and copy the qualifier.
+          out.write(key, keyOffset + commonPrefix, currentState.rowLength +
+              KeyValue.ROW_LENGTH_SIZE - commonPrefix);
+          out.write(key, keyOffset + currentState.familyLength +
+              KeyValue.FAMILY_LENGTH_SIZE + currentState.rowLength +
+              KeyValue.ROW_LENGTH_SIZE, currentState.qualifierLength);
+        } else {
+          // The common part includes the whole row. As the column family is
+          // the same across the whole file, it will automatically be included
+          // in the common prefix, so we need not special-case it here.
+          out.write(key, keyOffset + commonPrefix, keyLength - commonPrefix -
+              KeyValue.TIMESTAMP_TYPE_SIZE);
+        }
+        out.write(key, keyOffset + keyLength - (KeyValue.TIMESTAMP_TYPE_SIZE -
+            commonTimestampPrefix), KeyValue.TIMESTAMP_SIZE -
+            commonTimestampPrefix);
+
+        // Write the type if it is not the same as before.
+        if ((flag & FLAG_SAME_TYPE) == 0) {
+          out.write(currentState.type);
+        }
+
+        // Write the value if it is not the same as before.
+        if ((flag & FLAG_SAME_VALUE) == 0) {
+          out.write(value, valueOffset, valueLength);
+        }
+      }
+    }
   }
 
   protected static class FastDiffSeekerState extends SeekerState {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java Tue Oct 23 21:58:43 2012
@@ -16,14 +16,10 @@
  */
 package org.apache.hadoop.hbase.io.encoding;
 
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.FilterOutputStream;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -46,59 +42,12 @@ import org.apache.hadoop.io.RawComparato
  */
 public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
 
-  private int addKV(int prevKeyOffset, DataOutputStream out,
-      ByteBuffer in, int prevKeyLength) throws IOException {
-    int keyLength = in.getInt();
-    int valueLength = in.getInt();
-
-    if (prevKeyOffset == -1) {
-      // copy the key, there is no common prefix with none
-      ByteBufferUtils.putCompressedInt(out, keyLength);
-      ByteBufferUtils.putCompressedInt(out, valueLength);
-      ByteBufferUtils.putCompressedInt(out, 0);
-      ByteBufferUtils.moveBufferToStream(out, in, keyLength + valueLength);
-    } else {
-      // find a common prefix and skip it
-      int common = ByteBufferUtils.findCommonPrefix(
-          in, prevKeyOffset + KeyValue.ROW_OFFSET,
-          in.position(),
-          Math.min(prevKeyLength, keyLength));
-
-      ByteBufferUtils.putCompressedInt(out, keyLength - common);
-      ByteBufferUtils.putCompressedInt(out, valueLength);
-      ByteBufferUtils.putCompressedInt(out, common);
-
-      ByteBufferUtils.skip(in, common);
-      ByteBufferUtils.moveBufferToStream(out, in, keyLength - common
-          + valueLength);
-    }
-
-    return keyLength;
-  }
-
-  @Override
-  public void encodeKeyValues(DataOutputStream writeHere,
-      ByteBuffer in, boolean includesMemstoreTS) throws IOException {
-    in.rewind();
-    ByteBufferUtils.putInt(writeHere, in.limit());
-    int prevOffset = -1;
-    int offset = 0;
-    int keyLength = 0;
-    while (in.hasRemaining()) {
-      offset = in.position();
-      keyLength = addKV(prevOffset, writeHere, in, keyLength);
-      afterEncodingKeyValue(in, writeHere, includesMemstoreTS);
-      prevOffset = offset;
-    }
-  }
-
   @Override
-  public ByteBuffer decodeKeyValues(DataInputStream source,
-      int allocHeaderLength, int skipLastBytes, boolean includesMemstoreTS)
-          throws IOException {
+  public ByteBuffer decodeKeyValues(DataInputStream source, int allocHeaderLength,
+      boolean includesMemstoreTS, int totalEncodedSize) throws IOException {
+    int skipLastBytes = source.available() - totalEncodedSize;
     int decompressedSize = source.readInt();
-    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
-        allocHeaderLength);
+    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocHeaderLength);
     buffer.position(allocHeaderLength);
     int prevKeyOffset = 0;
 
@@ -108,16 +57,15 @@ public class PrefixKeyDeltaEncoder exten
     }
 
     if (source.available() != skipLastBytes) {
-      throw new IllegalStateException("Read too many bytes.");
+      throw new IOException("Read too many bytes");
     }
 
     buffer.limit(buffer.position());
     return buffer;
   }
 
-  private int decodeKeyValue(DataInputStream source, ByteBuffer buffer,
-      int prevKeyOffset)
-          throws IOException, EncoderBufferTooSmallException {
+  private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, int prevKeyOffset)
+      throws IOException, EncoderBufferTooSmallException {
     int keyLength = ByteBufferUtils.readCompressedInt(source);
     int valueLength = ByteBufferUtils.readCompressedInt(source);
     int commonLength = ByteBufferUtils.readCompressedInt(source);
@@ -162,8 +110,38 @@ public class PrefixKeyDeltaEncoder exten
   }
 
   @Override
-  public String toString() {
-    return PrefixKeyDeltaEncoder.class.getSimpleName();
+  public PrefixKeyDeltaEncoderWriter createWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException {
+    return new PrefixKeyDeltaEncoderWriter(out, includesMemstoreTS);
+  }
+
+  /**
+   * A writer that incrementally performs Prefix Key Delta Encoding
+   */
+  private static class PrefixKeyDeltaEncoderWriter extends BufferedEncodedWriter<EncodingState> {
+    public PrefixKeyDeltaEncoderWriter(DataOutputStream out,
+        boolean includesMemstoreTS) throws IOException {
+      super(out, includesMemstoreTS);
+    }
+
+    @Override
+    EncodingState createState() {
+      return new EncodingState();
+    }
+
+    @Override
+    protected void updateInitial(byte[] key, int keyOffset, int keyLength, byte[] value,
+        int valueOffset, int valueLength) throws IOException {
+      int common = prevState == null ? 0 : getCommonPrefixLength(key, keyOffset, keyLength,
+          this.prevState.key, this.prevState.keyOffset, this.prevState.keyLength);
+
+      ByteBufferUtils.putCompressedInt(this.out, keyLength - common);
+      ByteBufferUtils.putCompressedInt(this.out, valueLength);
+      ByteBufferUtils.putCompressedInt(this.out, common);
+
+      this.out.write(key, keyOffset + common, keyLength - common);
+      this.out.write(value, valueOffset, valueLength);
+    }
   }
 
   @Override

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Tue Oct 23 21:58:43 2012
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
@@ -120,7 +120,7 @@ public class HFileBlock extends SchemaCo
 
   /**
    * The on-disk size of the next block, including the header, obtained by
-   * peeking into the first {@link HEADER_SIZE} bytes of the next block's
+   * peeking into the first {@link #HEADER_SIZE} bytes of the next block's
    * header, or -1 if unknown.
    */
   private int nextBlockOnDiskSizeWithHeader = -1;
@@ -227,7 +227,7 @@ public class HFileBlock extends SchemaCo
   }
 
   /**
-   * Writes header fields into the first {@link HEADER_SIZE} bytes of the
+   * Writes header fields into the first {@link #HEADER_SIZE} bytes of the
    * buffer. Resets the buffer position to the end of header as side effect.
    */
   private void overwriteHeader() {
@@ -479,7 +479,7 @@ public class HFileBlock extends SchemaCo
    * <li>Call {@link Writer#startWriting(BlockType)} and get a data stream to
    * write to
    * <li>Write your data into the stream
-   * <li>Call {@link Writer#writeHeaderAndData()} as many times as you need to
+   * <li>Call {@link Writer#writeHeaderAndData} as many times as you need to
    * store the serialized block into an external stream, or call
    * {@link Writer#getHeaderAndData()} to get it as a byte array.
    * <li>Repeat to write more blocks
@@ -503,6 +503,8 @@ public class HFileBlock extends SchemaCo
     /** Data block encoder used for data blocks */
     private final HFileDataBlockEncoder dataBlockEncoder;
 
+    private DataBlockEncoder.EncodedWriter encodedWriter;
+
     /**
      * The stream we use to accumulate data in uncompressed format for each
      * block. We reset this stream at the end of each block and reuse it. The
@@ -559,15 +561,26 @@ public class HFileBlock extends SchemaCo
     /** Whether we are including memstore timestamp after every key/value */
     private boolean includesMemstoreTS;
 
+    public void appendEncodedKV(final long memstoreTS, final byte[] key,
+        final int keyOffset, final int keyLength, final byte[] value,
+        final int valueOffset, final int valueLength) throws IOException {
+      if (encodedWriter == null) {
+        throw new IOException("Must initialize encoded writer");
+      }
+      encodedWriter.update(memstoreTS, key, keyOffset, keyLength, value,
+          valueOffset, valueLength);
+    }
+
     /**
      * @param compressionAlgorithm compression algorithm to use
-     * @param dataBlockEncoderAlgo data block encoding algorithm to use
+     * @param dataBlockEncoder data block encoding algorithm to use
      */
     public Writer(Compression.Algorithm compressionAlgorithm,
           HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS) {
       compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm;
       this.dataBlockEncoder = dataBlockEncoder != null
           ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
+      this.encodedWriter = null;
 
       baosInMemory = new ByteArrayOutputStream();
       if (compressAlgo != NONE) {
@@ -614,18 +627,25 @@ public class HFileBlock extends SchemaCo
 
       // We will compress it later in finishBlock()
       userDataStream = new DataOutputStream(baosInMemory);
-      return userDataStream;
+
+      // We only encode data blocks.
+      if (this.blockType == BlockType.DATA) {
+        this.encodedWriter = this.dataBlockEncoder.getEncodedWriter(this.userDataStream,
+            this.includesMemstoreTS);
+        this.encodedWriter.reserveMetadataSpace();
+        return null;  // The caller should use appendEncodedKV
+      } else {
+        this.encodedWriter = null;
+        return userDataStream;
+      }
     }
 
     /**
-     * Returns the stream for the user to write to. The block writer takes care
-     * of handling compression and buffering for caching on write. Can only be
-     * called in the "writing" state.
-     *
-     * @return the data output stream for the user to write to
+     * Used to get the stream for the user to write data block contents into. This is unsafe and
+     * should only be used for testing. Key/value pairs should be appended using
+     * {@link #appendEncodedKV(long, byte[], int, int, byte[], int, int)}.
      */
-    DataOutputStream getUserDataStream() {
-      expectState(State.WRITING);
+    DataOutputStream getUserDataStreamUnsafe() {
       return userDataStream;
     }
 
@@ -706,31 +726,16 @@ public class HFileBlock extends SchemaCo
         return; // skip any non-data block
       }
 
-      // do data block encoding, if data block encoder is set
-      ByteBuffer rawKeyValues = ByteBuffer.wrap(uncompressedBytesWithHeader,
+      if (this.encodedWriter == null) {
+        throw new IOException("All data blocks must use an encoded writer");
+      }
+
+      if (this.dataBlockEncoder.finishEncoding(uncompressedBytesWithHeader,
           HEADER_SIZE, uncompressedBytesWithHeader.length -
-          HEADER_SIZE).slice();
-      Pair<ByteBuffer, BlockType> encodingResult =
-          dataBlockEncoder.beforeWriteToDisk(rawKeyValues,
-              includesMemstoreTS);
-
-      BlockType encodedBlockType = encodingResult.getSecond();
-      if (encodedBlockType == BlockType.ENCODED_DATA) {
-        uncompressedBytesWithHeader = encodingResult.getFirst().array();
-        blockType = BlockType.ENCODED_DATA;
-      } else {
-        // There is no encoding configured. Do some extra sanity-checking.
-        if (encodedBlockType != BlockType.DATA) {
-          throw new IOException("Unexpected block type coming out of data " +
-              "block encoder: " + encodedBlockType);
-        }
-        if (userDataStream.size() !=
-            uncompressedBytesWithHeader.length - HEADER_SIZE) {
-          throw new IOException("Uncompressed size mismatch: "
-              + userDataStream.size() + " vs. "
-              + (uncompressedBytesWithHeader.length - HEADER_SIZE));
-        }
+          HEADER_SIZE, this.encodedWriter)) {
+        this.blockType = BlockType.ENCODED_DATA;
       }
+      this.encodedWriter = null;
     }
 
     /**
@@ -776,7 +781,7 @@ public class HFileBlock extends SchemaCo
      * @param out the output stream to write the
      * @throws IOException
      */
-    private void writeHeaderAndData(DataOutputStream out) throws IOException {
+    void writeHeaderAndData(DataOutputStream out) throws IOException {
       ensureBlockReady();
       out.write(onDiskBytesWithHeader);
     }
@@ -1073,18 +1078,26 @@ public class HFileBlock extends SchemaCo
 
         t0 = EnvironmentEdgeManager.currentTimeMillis();
         numPositionalRead.incrementAndGet();
-        int ret = istream.read(fileOffset, dest, destOffset, size + extraSize);
-        if (pData != null) {
-          t1 = EnvironmentEdgeManager.currentTimeMillis();
-          timeToRead = t1 - t0;
-          pData.addToHist(ProfilingData.HFILE_BLOCK_P_READ_TIME_MS, timeToRead);
-        }
-        if (ret < size) {
-          throw new IOException("Positional read of " + size + " bytes " +
-              "failed at offset " + fileOffset + " (returned " + ret + ")");
+        int sizeToRead = size + extraSize;
+        int sizeRead = 0;
+        if (sizeToRead > 0) {
+          sizeRead = istream.read(fileOffset, dest, destOffset, sizeToRead);
+          if (pData != null) {
+            t1 = EnvironmentEdgeManager.currentTimeMillis();
+            timeToRead = t1 - t0;
+            pData.addToHist(ProfilingData.HFILE_BLOCK_P_READ_TIME_MS, timeToRead);
+          }
+          if (size == 0 && sizeRead == -1) {
+            // a degenerate case of a zero-size block and no next block header.
+            sizeRead = 0;
+          }
+          if (sizeRead < size) {
+            throw new IOException("Positional read of " + sizeToRead + " bytes " +
+                "failed at offset " + fileOffset + " (returned " + sizeRead + ")");
+          }
         }
 
-        if (ret == size || ret < size + extraSize) {
+        if (sizeRead == size || sizeRead < sizeToRead) {
           // Could not read the next block's header, or did not try.
           return -1;
         }
@@ -1146,23 +1159,14 @@ public class HFileBlock extends SchemaCo
      * Decompresses data from the given stream using the configured compression
      * algorithm.
      *
-     * @param boundedStream
-     *          a stream to read compressed data from, bounded to the exact
-     *          amount of compressed data
-     * @param compressedSize
-     *          compressed data size, header not included
      * @param uncompressedSize
      *          uncompressed data size, header not included
-     * @param header
-     *          the header to include before the decompressed data, or null.
-     *          Only the first {@link HFileBlock#HEADER_SIZE} bytes of the
-     *          buffer are included.
      * @return the byte buffer containing the given header (optionally) and the
      *         decompressed data
      * @throws IOException
      */
     protected void decompress(byte[] dest, int destOffset,
-        InputStream bufferedBoundedStream, int compressedSize,
+        InputStream bufferedBoundedStream,
         int uncompressedSize) throws IOException {
       Decompressor decompressor = null;
       try {
@@ -1281,7 +1285,7 @@ public class HFileBlock extends SchemaCo
         InputStream bufferedBoundedStream = createBufferedBoundedStream(
             offset, onDiskSize, pread);
         decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA,
-            bufferedBoundedStream, onDiskSize, uncompressedSizeWithMagic);
+            bufferedBoundedStream, uncompressedSizeWithMagic);
 
         // We don't really have a good way to exclude the "magic record" size
         // from the compressed block's size, since it is compressed as well.
@@ -1448,7 +1452,7 @@ public class HFileBlock extends SchemaCo
           b.allocateBuffer(b.nextBlockOnDiskSizeWithHeader > 0);
 
           decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
-              onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+              b.uncompressedSizeWithoutHeader);
 
           // Copy next block's header bytes into the new block if we have them.
           if (nextBlockOnDiskSize > 0) {
@@ -1511,7 +1515,7 @@ public class HFileBlock extends SchemaCo
               compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader));
 
           decompress(b.buf.array(), b.buf.arrayOffset() + HEADER_SIZE, dis,
-              b.onDiskSizeWithoutHeader, b.uncompressedSizeWithoutHeader);
+              b.uncompressedSizeWithoutHeader);
 
           if (b.nextBlockOnDiskSizeWithHeader > 0) {
             // Copy the next block's header into the new block.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -16,9 +16,11 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Pair;
@@ -82,4 +84,23 @@ public interface HFileDataBlockEncoder {
    */
   public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
 
+  /**
+   * @return an incremental encoding writer that uses the given output stream
+   */
+  public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException;
+
+  /**
+   * Complete the encoding process
+   *
+   *
+   * @param data The encoded data so far
+   * @param offset Location of encoded bytes in data
+   * @param length Length of encoded bytes in data
+   * @param writer EncodedWriter used to insert encoded data
+   * @return true if the data was stored in an encoded format, false if unencoded
+   * @throws IOException
+   */
+  public boolean finishEncoding(byte[] data, final int offset, final int length,
+      DataBlockEncoder.EncodedWriter writer) throws IOException;
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileDataBlockEncoderImpl.java Tue Oct 23 21:58:43 2012
@@ -209,4 +209,20 @@ public class HFileDataBlockEncoderImpl i
         inCache + ")";
   }
 
+  @Override
+  public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException {
+    if (onDisk != DataBlockEncoding.NONE) {
+      this.onDisk.writeIdInBytes(out);
+    }
+
+    return this.onDisk.getEncoder().createWriter(out, includesMemstoreTS);
+  }
+
+  @Override
+  public boolean finishEncoding(byte[] data, final int offset, final int length,
+      DataBlockEncoder.EncodedWriter writer) throws IOException {
+    int bytesToSkip = onDisk.encodingIdSize();
+    return writer.finishEncoding(data, offset + bytesToSkip, length - bytesToSkip);
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Tue Oct 23 21:58:43 2012
@@ -23,7 +23,6 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Tue Oct 23 21:58:43 2012
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Writes HFile format version 2.
@@ -282,9 +281,9 @@ public class HFileWriterV2 extends Abstr
    * @param vlength
    * @throws IOException
    */
-  private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
-      final byte[] value, final int voffset, final int vlength)
-      throws IOException {
+  private void append(final long memstoreTS, final byte[] key,
+      final int koffset, final int klength, final byte[] value,
+      final int voffset, final int vlength) throws IOException {
     boolean dupKey = checkKey(key, koffset, klength);
     checkValue(value, voffset, vlength);
     if (!dupKey) {
@@ -296,18 +295,10 @@ public class HFileWriterV2 extends Abstr
 
     // Write length of key and value and then actual key and value bytes.
     // Additionally, we may also write down the memstoreTS.
-    {
-      DataOutputStream out = fsBlockWriter.getUserDataStream();
-      out.writeInt(klength);
-      totalKeyLength += klength;
-      out.writeInt(vlength);
-      totalValueLength += vlength;
-      out.write(key, koffset, klength);
-      out.write(value, voffset, vlength);
-      if (this.includeMemstoreTS) {
-        WritableUtils.writeVLong(out, memstoreTS);
-      }
-    }
+    this.fsBlockWriter.appendEncodedKV(memstoreTS, key, koffset, klength, value,
+        voffset, vlength);
+    totalKeyLength += klength;
+    totalValueLength += vlength;
 
     // Are we the first key in this block?
     if (firstKeyInBlock == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java?rev=1401499&r1=1401498&r2=1401499&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java Tue Oct 23 21:58:43 2012
@@ -16,8 +16,11 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Pair;
@@ -77,4 +80,16 @@ public class NoOpDataBlockEncoder implem
     return getClass().getSimpleName();
   }
 
+  @Override
+  public DataBlockEncoder.EncodedWriter getEncodedWriter(DataOutputStream out,
+      boolean includesMemstoreTS) throws IOException {
+    return DataBlockEncoding.NONE.getEncoder().createWriter(out,
+        includesMemstoreTS);
+  }
+
+  @Override
+  public boolean finishEncoding(byte[] data, final int offset, final int length,
+      DataBlockEncoder.EncodedWriter writer) throws IOException {
+    return writer.finishEncoding(data, offset, length);
+  }
 }