You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/07/17 09:59:25 UTC

[4/4] hbase git commit: HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)

HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/834f87b2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/834f87b2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/834f87b2

Branch: refs/heads/master
Commit: 834f87b23de533783ba5f5b858327a6164f17f55
Parents: a249989
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Jul 17 13:27:29 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Jul 17 13:27:29 2015 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/KeyValue.java  |   10 +-
 .../hadoop/hbase/io/ByteBuffInputStream.java    |  100 ++
 .../hadoop/hbase/io/ByteBufferInputStream.java  |  101 --
 .../io/encoding/BufferedDataBlockEncoder.java   |    4 +-
 .../io/encoding/CopyKeyDataBlockEncoder.java    |   12 +-
 .../hbase/io/encoding/DataBlockEncoder.java     |    3 +-
 .../hbase/io/encoding/DiffKeyDeltaEncoder.java  |   15 +-
 .../hbase/io/encoding/FastDiffDeltaEncoder.java |   18 +-
 .../io/encoding/HFileBlockDecodingContext.java  |   23 +-
 .../HFileBlockDefaultDecodingContext.java       |    8 +-
 .../io/encoding/PrefixKeyDeltaEncoder.java      |   18 +-
 .../apache/hadoop/hbase/io/hfile/BlockType.java |    5 +-
 .../org/apache/hadoop/hbase/nio/ByteBuff.java   |  438 +++++++
 .../apache/hadoop/hbase/nio/MultiByteBuff.java  | 1100 ++++++++++++++++++
 .../hadoop/hbase/nio/MultiByteBuffer.java       | 1047 -----------------
 .../apache/hadoop/hbase/nio/SingleByteBuff.java |  312 +++++
 .../hadoop/hbase/util/ByteBufferArray.java      |   64 +
 .../hadoop/hbase/util/ByteBufferUtils.java      |  118 +-
 .../java/org/apache/hadoop/hbase/util/Hash.java |    2 +
 .../apache/hadoop/hbase/util/UnsafeAccess.java  |   68 ++
 .../hbase/io/TestByteBufferInputStream.java     |   82 --
 .../hbase/io/TestMultiByteBuffInputStream.java  |   83 ++
 .../hadoop/hbase/nio/TestMultiByteBuff.java     |  324 ++++++
 .../hadoop/hbase/nio/TestMultiByteBuffer.java   |  316 -----
 .../hbase/codec/prefixtree/PrefixTreeCodec.java |   10 +-
 .../codec/prefixtree/PrefixTreeSeeker.java      |    1 +
 .../hbase/io/hfile/CacheableDeserializer.java   |    6 +-
 .../hbase/io/hfile/CompoundBloomFilter.java     |    8 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java |    4 +-
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  106 +-
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java  |   41 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  122 +-
 .../hbase/io/hfile/MemcachedBlockCache.java     |    5 +-
 .../hbase/io/hfile/bucket/BucketCache.java      |   11 +-
 .../io/hfile/bucket/ByteBufferIOEngine.java     |   14 +
 .../hbase/io/hfile/bucket/FileIOEngine.java     |   18 +
 .../hadoop/hbase/io/hfile/bucket/IOEngine.java  |   21 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |    2 -
 .../hadoop/hbase/regionserver/StoreFile.java    |    4 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |    1 -
 .../apache/hadoop/hbase/util/BloomFilter.java   |    6 +-
 .../hadoop/hbase/util/BloomFilterChunk.java     |   31 -
 .../hadoop/hbase/util/BloomFilterUtil.java      |   16 +-
 .../hbase/util/hbck/TableLockChecker.java       |    1 -
 .../hadoop/hbase/client/TestFromClientSide.java |    2 +
 .../io/encoding/TestDataBlockEncoders.java      |    4 +-
 .../hadoop/hbase/io/hfile/CacheTestUtils.java   |   13 +-
 .../hadoop/hbase/io/hfile/TestCacheConfig.java  |    6 +-
 .../hadoop/hbase/io/hfile/TestChecksum.java     |    6 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java |   11 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java   |   33 +-
 .../io/hfile/TestHFileBlockCompatibility.java   |    7 +-
 .../hbase/io/hfile/TestHFileBlockIndex.java     |   12 +-
 .../hbase/io/hfile/TestHFileWriterV2.java       |    6 +-
 .../hbase/io/hfile/TestHFileWriterV3.java       |    6 +-
 .../io/hfile/bucket/TestByteBufferIOEngine.java |   45 +
 .../hadoop/hbase/util/TestBloomFilterChunk.java |   68 +-
 .../hadoop/hbase/util/TestByteBuffUtils.java    |   78 ++
 .../hadoop/hbase/util/TestByteBufferUtils.java  |    2 +-
 59 files changed, 3104 insertions(+), 1894 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 8c73984..368bf41 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -2613,6 +2613,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
    */
   public static class KeyOnlyKeyValue extends KeyValue {
+    private short rowLen = -1;
     public KeyOnlyKeyValue() {
 
     }
@@ -2624,6 +2625,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       this.bytes = b;
       this.length = length;
       this.offset = offset;
+      this.rowLen = Bytes.toShort(this.bytes, this.offset);
     }
 
     @Override
@@ -2642,6 +2644,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       this.bytes = key;
       this.offset = offset;
       this.length = length;
+      this.rowLen = Bytes.toShort(this.bytes, this.offset);
     }
 
     @Override
@@ -2699,7 +2702,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
 
     @Override
     public short getRowLength() {
-      return Bytes.toShort(this.bytes, getKeyOffset());
+      return rowLen;
     }
 
     @Override
@@ -2769,5 +2772,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     public boolean equals(Object other) {
       return super.equals(other);
     }
+
+    @Override
+    public long heapSize() {
+      return super.heapSize() + Bytes.SIZEOF_SHORT;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
new file mode 100644
index 0000000..4f6b3c2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+
+/**
+ * Not thread safe!
+ * <p>
+ * Please note that the reads will cause position movement on wrapped ByteBuff.
+ */
+@InterfaceAudience.Private
+public class ByteBuffInputStream extends InputStream {
+
+  private ByteBuff buf;
+
+  public ByteBuffInputStream(ByteBuff buf) {
+      this.buf = buf;
+  }
+
+  /**
+   * Reads the next byte of data from this input stream. The value byte is returned as an
+   * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
+   * because the end of the stream has been reached, the value <code>-1</code> is returned.
+   * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
+   */
+  public int read() {
+    if (this.buf.hasRemaining()) {
+      return (this.buf.get() & 0xff);
+    }
+    return -1;
+  }
+
+  /**
+   * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
+   * given offset).
+   * @param b the array into which the data is read.
+   * @param off the start offset in the destination array <code>b</code>
+   * @param len the maximum number of bytes to read.
+   * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
+   *         1 byte can be read because the end of the stream has been reached.
+   */
+  public int read (byte b[], int off, int len) {
+    int avail = available();
+    if (avail <= 0) {
+      return -1;
+    }
+    if (len <= 0) {
+      return 0;
+    }
+
+    if (len > avail) {
+      len = avail;
+    }
+    this.buf.get(b, off, len);
+    return len;
+  }
+
+  /**
+   * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
+   * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
+   * equal to the smaller of <code>n</code> and remaining bytes in the stream.
+   * @param n the number of bytes to be skipped.
+   * @return the actual number of bytes skipped.
+   */
+  public long skip(long n) {
+    long k = Math.min(n, available());
+    if (k <= 0) {
+      return 0;
+    }
+    this.buf.skip((int) k);
+    return k;
+  }
+
+  /**
+   * @return  the number of remaining bytes that can be read (or skipped
+   *          over) from this input stream.
+   */
+  public int available() {
+    return this.buf.remaining();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
deleted file mode 100644
index 1530ccd..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Not thread safe!
- * <p>
- * Please note that the reads will cause position movement on wrapped ByteBuffer.
- */
-@InterfaceAudience.Private
-public class ByteBufferInputStream extends InputStream {
-
-  private ByteBuffer buf;
-
-  public ByteBufferInputStream(ByteBuffer buf) {
-      this.buf = buf;
-  }
-
-  /**
-   * Reads the next byte of data from this input stream. The value byte is returned as an
-   * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
-   * because the end of the stream has been reached, the value <code>-1</code> is returned.
-   * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
-   */
-  public int read() {
-    if (this.buf.hasRemaining()) {
-      return (this.buf.get() & 0xff);
-    }
-    return -1;
-  }
-
-  /**
-   * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
-   * given offset).
-   * @param b the array into which the data is read.
-   * @param off the start offset in the destination array <code>b</code>
-   * @param len the maximum number of bytes to read.
-   * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
-   *         1 byte can be read because the end of the stream has been reached.
-   */
-  public int read(byte b[], int off, int len) {
-    int avail = available();
-    if (avail <= 0) {
-      return -1;
-    }
-
-    if (len > avail) {
-      len = avail;
-    }
-    if (len <= 0) {
-      return 0;
-    }
-
-    this.buf.get(b, off, len);
-    return len;
-  }
-
-  /**
-   * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
-   * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
-   * equal to the smaller of <code>n</code> and remaining bytes in the stream.
-   * @param n the number of bytes to be skipped.
-   * @return the actual number of bytes skipped.
-   */
-  public long skip(long n) {
-    long k = Math.min(n, available());
-    if (k < 0) {
-      k = 0;
-    }
-    this.buf.position((int) (this.buf.position() + k));
-    return k;
-  }
-
-  /**
-   * @return  the number of remaining bytes that can be read (or skipped
-   *          over) from this input stream.
-   */
-  public int available() {
-    return this.buf.remaining();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 03875dc..966c59b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -601,7 +601,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
       kvBuffer.putInt(current.keyLength);
       kvBuffer.putInt(current.valueLength);
       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
-      ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
+      ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.valueOffset,
           current.valueLength);
       if (current.tagsLength > 0) {
         // Put short as unsigned
@@ -610,7 +610,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
         if (current.tagsOffset != -1) {
           // the offset of the tags bytes in the underlying buffer is marked. So the temp
           // buffer,tagsBuffer was not been used.
-          ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
+          ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.tagsOffset,
               current.tagsLength);
         } else {
           // When tagsOffset is marked as -1, tag compression was present and so the tags were

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 4eea272..662be29 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
@@ -66,13 +67,12 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
-    int keyLength = block.getInt(Bytes.SIZEOF_INT);
-    ByteBuffer dup = block.duplicate();
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
+    int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT);
     int pos = 3 * Bytes.SIZEOF_INT;
-    dup.position(pos);
-    dup.limit(pos + keyLength);
-    return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+    ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate();
+    // TODO : to be changed here for BBCell
+    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index b0467b8..ce71308 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  * Encoding of KeyValue. It aims to be fast and efficient using assumptions:
@@ -90,7 +91,7 @@ public interface DataBlockEncoder {
    * @param block encoded block we want index, the position will not change
    * @return First key in block as a cell.
    */
-  Cell getFirstKeyCellInBlock(ByteBuffer block);
+  Cell getFirstKeyCellInBlock(ByteBuff block);
 
   /**
    * Create a HFileBlock seeker which find KeyValues within a block.

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index f2d4751..90b8e6e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -305,15 +306,16 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
     block.mark();
     block.position(Bytes.SIZEOF_INT);
     byte familyLength = block.get();
-    ByteBufferUtils.skip(block, familyLength);
+    block.skip(familyLength);
     byte flag = block.get();
-    int keyLength = ByteBufferUtils.readCompressedInt(block);
-    ByteBufferUtils.readCompressedInt(block); // valueLength
-    ByteBufferUtils.readCompressedInt(block); // commonLength
+    int keyLength  = ByteBuff.readCompressedInt(block);
+    // TODO : See if we can avoid these reads as the read values are not getting used
+    ByteBuff.readCompressedInt(block); // valueLength
+    ByteBuff.readCompressedInt(block); // commonLength
     ByteBuffer result = ByteBuffer.allocate(keyLength);
 
     // copy row
@@ -341,7 +343,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
     // copy the timestamp and type
     int timestampFitInBytes =
         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
-    long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
+    long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
       timestamp = -timestamp;
     }
@@ -350,6 +352,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
 
     block.reset();
+    // The result is already a BB. So always we will create a KeyOnlyKv.
     return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index f750e09..fa4adbd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -354,18 +355,17 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
     block.mark();
     block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
-    int keyLength = ByteBufferUtils.readCompressedInt(block);
-    ByteBufferUtils.readCompressedInt(block); // valueLength
-    ByteBufferUtils.readCompressedInt(block); // commonLength
-    int pos = block.position();
+    int keyLength = ByteBuff.readCompressedInt(block);
+    // TODO : See if we can avoid these reads as the read values are not getting used
+    ByteBuff.readCompressedInt(block); // valueLength
+    ByteBuff.readCompressedInt(block); // commonLength
+    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
     block.reset();
-    ByteBuffer dup = block.duplicate();
-    dup.position(pos);
-    dup.limit(pos + keyLength);
-    return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+    // TODO : Change to BBCell.
+    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
index 37001cc..ffdb694 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
@@ -17,10 +17,10 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  * A decoding context that is created by a reader's encoder, and is shared
@@ -32,22 +32,27 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 public interface HFileBlockDecodingContext {
 
   /**
-   * Perform all actions that need to be done before the encoder's real decoding process.
-   * Decompression needs to be done if {@link HFileContext#getCompression()} returns a valid compression
+   * Perform all actions that need to be done before the encoder's real decoding
+   * process. Decompression needs to be done if
+   * {@link HFileContext#getCompression()} returns a valid compression
    * algorithm.
    *
-   * @param onDiskSizeWithoutHeader numBytes after block and encoding headers
-   * @param uncompressedSizeWithoutHeader numBytes without header required to store the block after
+   * @param onDiskSizeWithoutHeader
+   *          numBytes after block and encoding headers
+   * @param uncompressedSizeWithoutHeader
+   *          numBytes without header required to store the block after
    *          decompressing (not decoding)
-   * @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data
-   * @param onDiskBlock on disk data to be decoded
+   * @param blockBufferWithoutHeader
+   *          ByteBuffer pointed after the header but before the data
+   * @param onDiskBlock
+   *          on disk data to be decoded
    * @throws IOException
    */
   void prepareDecoding(
     int onDiskSizeWithoutHeader,
     int uncompressedSizeWithoutHeader,
-    ByteBuffer blockBufferWithoutHeader,
-    ByteBuffer onDiskBlock
+    ByteBuff blockBufferWithoutHeader,
+    ByteBuff onDiskBlock
   ) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
index 78bb0d6..30382d9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
@@ -19,17 +19,17 @@ package org.apache.hadoop.hbase.io.encoding;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
 import org.apache.hadoop.hbase.io.crypto.Decryptor;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -51,8 +51,8 @@ public class HFileBlockDefaultDecodingContext implements
 
   @Override
   public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
-      ByteBuffer blockBufferWithoutHeader, ByteBuffer onDiskBlock) throws IOException {
-    InputStream in = new DataInputStream(new ByteBufferInputStream(onDiskBlock));
+      ByteBuff blockBufferWithoutHeader, ByteBuff onDiskBlock) throws IOException {
+    InputStream in = new DataInputStream(new ByteBuffInputStream(onDiskBlock));
 
     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
     if (cryptoContext != Encryption.Context.NONE) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 15608cc..6e89de4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -172,22 +173,21 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
     block.mark();
     block.position(Bytes.SIZEOF_INT);
-    int keyLength = ByteBufferUtils.readCompressedInt(block);
-    ByteBufferUtils.readCompressedInt(block);
-    int commonLength = ByteBufferUtils.readCompressedInt(block);
+    int keyLength = ByteBuff.readCompressedInt(block);
+    // TODO : See if we can avoid these reads as the read values are not getting used
+    ByteBuff.readCompressedInt(block);
+    int commonLength = ByteBuff.readCompressedInt(block);
     if (commonLength != 0) {
       throw new AssertionError("Nonzero common length in the first key in "
           + "block: " + commonLength);
     }
-    int pos = block.position();
+    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
     block.reset();
-    ByteBuffer dup = block.duplicate();
-    dup.position(pos);
-    dup.limit(pos + keyLength);
-    return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+    // TODO : Change to BBCell
+    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
index 0db584e..4228f57 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -131,7 +132,7 @@ public enum BlockType {
     out.write(magic);
   }
 
-  public void write(ByteBuffer buf) {
+  public void write(ByteBuff buf) {
     buf.put(magic);
   }
 
@@ -161,7 +162,7 @@ public enum BlockType {
     return parse(buf, 0, buf.length);
   }
 
-  public static BlockType read(ByteBuffer buf) throws IOException {
+  public static BlockType read(ByteBuff buf) throws IOException {
     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)];
     buf.get(magicBuf);
     BlockType blockType = parse(magicBuf, 0, magicBuf.length);

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
new file mode 100644
index 0000000..14e77a7
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An abstract class that abstracts out as to how the byte buffers are used,
+ * either single or multiple. We have this interface because the java's ByteBuffers
+ * cannot be sub-classed. This class provides APIs similar to the ones provided
+ * in java's nio ByteBuffers and allows you to do positional reads/writes and relative
+ * reads and writes on the underlying BB. In addition to it, we have some additional APIs which
+ * helps us in the read path.
+ */
+@InterfaceAudience.Private
+public abstract class ByteBuff {
+  /**
+   * @return this ByteBuff's current position
+   */
+  public abstract int position();
+
+  /**
+   * Sets this ByteBuff's position to the given value.
+   * @param position
+   * @return this object
+   */
+  public abstract ByteBuff position(int position);
+
+  /**
+   * Jumps the current position of this ByteBuff by specified length.
+   * @param len the length to be skipped
+   */
+  public abstract ByteBuff skip(int len);
+
+  /**
+   * Jumps back the current position of this ByteBuff by specified length.
+   * @param len the length to move back
+   */
+  public abstract ByteBuff moveBack(int len);
+
+  /**
+   * @return the total capacity of this ByteBuff.
+   */
+  public abstract int capacity();
+
+  /**
+   * Returns the limit of this ByteBuff
+   * @return limit of the ByteBuff
+   */
+  public abstract int limit();
+
+  /**
+   * Marks the limit of this ByteBuff.
+   * @param limit
+   * @return This ByteBuff
+   */
+  public abstract ByteBuff limit(int limit);
+
+  /**
+   * Rewinds this ByteBuff and the position is set to 0
+   * @return this object
+   */
+  public abstract ByteBuff rewind();
+
+  /**
+   * Marks the current position of the ByteBuff
+   * @return this object
+   */
+  public abstract ByteBuff mark();
+
+  /**
+   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
+   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
+   * as such will be returned. So users are warned not to change the position or limit of this
+   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
+   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
+   * the bytes to a newly created ByteBuffer of required size and return that.
+   *
+   * @param length number of bytes required.
+   * @return bytes from current position till length specified, as a single ByteButter.
+   */
+  public abstract ByteBuffer asSubByteBuffer(int length);
+
+  /**
+   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
+   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
+   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
+   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
+   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
+   * ByteBuffer of required size and return that.
+   *
+   * @param offset the offset in this ByteBuff from where the subBuffer should be created
+   * @param length the length of the subBuffer
+   * @param pair a pair that will have the bytes from the current position till length specified,
+   *        as a single ByteBuffer and offset in that Buffer where the bytes starts.
+   *        Since this API gets called in a loop we are passing a pair to it which could be created
+   *        outside the loop and the method would set the values on the pair that is passed in by
+   *        the caller. Thus it avoids more object creations that would happen if the pair that is
+   *        returned is created by this method every time.
+   */
+  public abstract void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair);
+
+  /**
+   * Returns the number of elements between the current position and the
+   * limit.
+   * @return the remaining elements in this ByteBuff
+   */
+  public abstract int remaining();
+
+  /**
+   * Returns true if there are elements between the current position and the limt
+   * @return true if there are elements, false otherwise
+   */
+  public abstract boolean hasRemaining();
+
+  /**
+   * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff
+   * is reset back to last marked position.
+   * @return This ByteBuff
+   */
+  public abstract ByteBuff reset();
+
+  /**
+   * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark
+   * of the new ByteBuff will be independent than that of the original ByteBuff.
+   * The content of the new ByteBuff will start at this ByteBuff's current position
+   * @return a sliced ByteBuff
+   */
+  public abstract ByteBuff slice();
+
+  /**
+   * Returns an ByteBuff which is a duplicate version of this ByteBuff. The
+   * position, limit and mark of the new ByteBuff will be independent than that
+   * of the original ByteBuff. The content of the new ByteBuff will start at
+   * this ByteBuff's current position The position, limit and mark of the new
+   * ByteBuff would be identical to this ByteBuff in terms of values.
+   *
+   * @return a sliced ByteBuff
+   */
+  public abstract ByteBuff duplicate();
+
+  /**
+   * A relative method that returns byte at the current position.  Increments the
+   * current position by the size of a byte.
+   * @return the byte at the current position
+   */
+  public abstract byte get();
+
+  /**
+   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the byte at the given index
+   */
+  public abstract byte get(int index);
+
+  /**
+   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #get(int)} the index specified should be after
+   * the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the byte value at the given index.
+   */
+  public abstract byte getByteStrictlyForward(int index);
+
+  /**
+   * Writes a byte to this ByteBuff at the current position and increments the position
+   * @param b
+   * @return this object
+   */
+  public abstract ByteBuff put(byte b);
+
+  /**
+   * Writes a byte to this ByteBuff at the given index
+   * @param index
+   * @param b
+   * @return this object
+   */
+  public abstract ByteBuff put(int index, byte b);
+
+  /**
+   * Copies the specified number of bytes from this ByteBuff's current position to
+   * the byte[]'s offset. Also advances the position of the ByteBuff by the given length.
+   * @param dst
+   * @param offset within the current array
+   * @param length upto which the bytes to be copied
+   */
+  public abstract void get(byte[] dst, int offset, int length);
+
+  /**
+   * Copies the content from this ByteBuff's current position to the byte array and fills it. Also
+   * advances the position of the ByteBuff by the length of the byte[].
+   * @param dst
+   */
+  public abstract void get(byte[] dst);
+
+  /**
+   * Copies from the given byte[] to this ByteBuff
+   * @param src
+   * @param offset the position in the byte array from which the copy should be done
+   * @param length the length upto which the copy should happen
+   * @return this ByteBuff
+   */
+  public abstract ByteBuff put(byte[] src, int offset, int length);
+
+  /**
+   * Copies from the given byte[] to this ByteBuff
+   * @param src
+   * @return this ByteBuff
+   */
+  public abstract ByteBuff put(byte[] src);
+
+  /**
+   * @return true or false if the underlying BB support hasArray
+   */
+  public abstract boolean hasArray();
+
+  /**
+   * @return the byte[] if the underlying BB has single BB and hasArray true
+   */
+  public abstract byte[] array();
+
+  /**
+   * @return the arrayOffset of the byte[] incase of a single BB backed ByteBuff
+   */
+  public abstract int arrayOffset();
+
+  /**
+   * Returns the short value at the current position. Also advances the position by the size
+   * of short
+   *
+   * @return the short value at the current position
+   */
+  public abstract short getShort();
+
+  /**
+   * Fetches the short value at the given index. Does not change position of the
+   * underlying ByteBuffers. The caller is sure that the index will be after
+   * the current position of this ByteBuff. So even if the current short does not fit in the
+   * current item we can safely move to the next item and fetch the remaining bytes forming
+   * the short
+   *
+   * @param index
+   * @return the short value at the given index
+   */
+  public abstract short getShort(int index);
+
+  /**
+   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #getShort(int)} the index specified should be
+   * after the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the short value at the given index.
+   */
+  public abstract short getShortStrictlyForward(int index);
+
+  /**
+   * Returns the int value at the current position. Also advances the position by the size of int
+   *
+   * @return the int value at the current position
+   */
+  public abstract int getInt();
+
+  /**
+   * Writes an int to this ByteBuff at its current position. Also advances the position
+   * by size of int
+   * @param value Int value to write
+   * @return this object
+   */
+  public abstract ByteBuff putInt(int value);
+
+  /**
+   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
+   * Even if the current int does not fit in the
+   * current item we can safely move to the next item and fetch the remaining bytes forming
+   * the int
+   *
+   * @param index
+   * @return the int value at the given index
+   */
+  public abstract int getInt(int index);
+
+  /**
+   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #getInt(int)} the index specified should be after
+   * the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the int value at the given index.
+   */
+  // TODO: any better name here?? getIntFromSubsequentPosition? or getIntAfterCurrentPosition?
+  // TODO : Make this relative wrt current position? Follow on JIRA
+  public abstract int getIntStrictlyForward(int index);
+  /**
+   * Returns the long value at the current position. Also advances the position by the size of long
+   *
+   * @return the long value at the current position
+   */
+  public abstract long getLong();
+
+  /**
+   * Writes a long to this ByteBuff at its current position.
+   * Also advances the position by size of long
+   * @param value Long value to write
+   * @return this object
+   */
+  public abstract ByteBuff putLong(long value);
+
+  /**
+   * Fetches the long at the given index. Does not change position of the
+   * underlying ByteBuffers. The caller is sure that the index will be after
+   * the current position of this ByteBuff. So even if the current long does not fit in the
+   * current item we can safely move to the next item and fetch the remaining bytes forming
+   * the long
+   *
+   * @param index
+   * @return the long value at the given index
+   */
+  public abstract long getLong(int index);
+
+  /**
+   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #getLong(int)} the index specified should be after
+   * the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the long value at the given index.
+   */
+  public abstract long getLongStrictlyForward(int index);
+
+  /**
+   * Copy the content from this ByteBuff to a byte[] based on the given offset and
+   * length
+   *
+   * @param offset
+   *          the position from where the copy should start
+   * @param length
+   *          the length upto which the copy has to be done
+   * @return byte[] with the copied contents from this ByteBuff.
+   */
+  public abstract byte[] toBytes(int offset, int length);
+
+  /**
+   * Copies the content from this ByteBuff to a ByteBuffer
+   * Note : This will advance the position marker of {@code out} but not change the position maker
+   * for this ByteBuff
+   * @param out the ByteBuffer to which the copy has to happen
+   * @param sourceOffset the offset in the ByteBuff from which the elements has
+   * to be copied
+   * @param length the length in this ByteBuff upto which the elements has to be copied
+   */
+  public abstract void get(ByteBuffer out, int sourceOffset, int length);
+
+  /**
+   * Copies the contents from the src ByteBuff to this ByteBuff. This will be
+   * absolute positional copying and
+   * won't affect the position of any of the buffers.
+   * @param offset the position in this ByteBuff to which the copy should happen
+   * @param src the src ByteBuff
+   * @param srcOffset the offset in the src ByteBuff from where the elements should be read
+   * @param length the length up to which the copy should happen
+   */
+  public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
+
+  // static helper methods
+  /**
+   * Read integer from ByteBuff coded in 7 bits and increment position.
+   * @return Read integer.
+   */
+  public static int readCompressedInt(ByteBuff buf) {
+    byte b = buf.get();
+    if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) {
+      return (b & ByteBufferUtils.VALUE_MASK)
+          + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT);
+    }
+    return b & ByteBufferUtils.VALUE_MASK;
+  }
+
+  /**
+   * Compares two ByteBuffs
+   *
+   * @param buf1 the first ByteBuff
+   * @param o1 the offset in the first ByteBuff from where the compare has to happen
+   * @param len1 the length in the first ByteBuff upto which the compare has to happen
+   * @param buf2 the second ByteBuff
+   * @param o2 the offset in the second ByteBuff from where the compare has to happen
+   * @param len2 the length in the second ByteBuff upto which the compare has to happen
+   * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is
+   *         smaller than buf2.
+   */
+  public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2,
+      int o2, int len2) {
+    if (buf1.hasArray() && buf2.hasArray()) {
+      return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
+          buf2.arrayOffset() + o2, len2);
+    }
+    int end1 = o1 + len1;
+    int end2 = o2 + len2;
+    for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
+      int a = buf1.get(i) & 0xFF;
+      int b = buf2.get(j) & 0xFF;
+      if (a != b) {
+        return a - b;
+      }
+    }
+    return len1 - len2;
+  }
+
+  /**
+   * Read long which was written to fitInBytes bytes and increment position.
+   * @param fitInBytes In how many bytes given long is stored.
+   * @return The value of parsed long.
+   */
+  public static long readLong(ByteBuff in, final int fitInBytes) {
+    long tmpLength = 0;
+    for (int i = 0; i < fitInBytes; ++i) {
+      tmpLength |= (in.get() & 0xffl) << (8l * i);
+    }
+    return tmpLength;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
new file mode 100644
index 0000000..984ade5
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -0,0 +1,1100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
+ * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
+ * short, long etc and doing operations like mark, reset, slice etc. This has to be used when
+ * data is split across multiple byte buffers and we don't want copy them to single buffer
+ * for reading from it.
+ */
+@InterfaceAudience.Private
+public class MultiByteBuff extends ByteBuff {
+
+  private final ByteBuffer[] items;
+  // Pointer to the current item in the MBB
+  private ByteBuffer curItem = null;
+  // Index of the current item in the MBB
+  private int curItemIndex = 0;
+
+  private int limit = 0;
+  private int limitedItemIndex;
+  private int markedItemIndex = -1;
+  private final int[] itemBeginPos;
+
+  public MultiByteBuff(ByteBuffer... items) {
+    assert items != null;
+    assert items.length > 0;
+    this.items = items;
+    this.curItem = this.items[this.curItemIndex];
+    // See below optimization in getInt(int) where we check whether the given index land in current
+    // item. For this we need to check whether the passed index is less than the next item begin
+    // offset. To handle this effectively for the last item buffer, we add an extra item into this
+    // array.
+    itemBeginPos = new int[items.length + 1];
+    int offset = 0;
+    for (int i = 0; i < items.length; i++) {
+      ByteBuffer item = items[i];
+      item.rewind();
+      itemBeginPos[i] = offset;
+      int l = item.limit() - item.position();
+      offset += l;
+    }
+    this.limit = offset;
+    this.itemBeginPos[items.length] = offset + 1;
+    this.limitedItemIndex = this.items.length - 1;
+  }
+
+  private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
+      int curItemIndex, int markedIndex) {
+    this.items = items;
+    this.curItemIndex = curItemIndex;
+    this.curItem = this.items[this.curItemIndex];
+    this.itemBeginPos = itemBeginPos;
+    this.limit = limit;
+    this.limitedItemIndex = limitedIndex;
+    this.markedItemIndex = markedIndex;
+  }
+
+  /**
+   * @throws UnsupportedOperationException MBB does not support
+   * array based operations
+   */
+  @Override
+  public byte[] array() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @throws UnsupportedOperationException MBB does not
+   * support array based operations
+   */
+  @Override
+  public int arrayOffset() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return false. MBB does not support array based operations
+   */
+  @Override
+  public boolean hasArray() {
+    return false;
+  }
+
+  /**
+   * @return the total capacity of this MultiByteBuffer.
+   */
+  @Override
+  public int capacity() {
+    int c = 0;
+    for (ByteBuffer item : this.items) {
+      c += item.capacity();
+    }
+    return c;
+  }
+
+  /**
+   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the byte at the given index
+   */
+  @Override
+  public byte get(int index) {
+    int itemIndex = getItemIndex(index);
+    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
+  }
+
+  @Override
+  public byte getByteStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex = getItemIndexFromCurItemIndex(index);
+    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
+  }
+
+  /*
+   * Returns in which sub ByteBuffer, the given element index will be available.
+   */
+  private int getItemIndex(int elemIndex) {
+    int index = 1;
+    while (elemIndex >= this.itemBeginPos[index]) {
+      index++;
+      if (index == this.itemBeginPos.length) {
+        throw new IndexOutOfBoundsException();
+      }
+    }
+    return index - 1;
+  }
+
+  /*
+   * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
+   * sure that the item will be after MBB's current position
+   */
+  private int getItemIndexFromCurItemIndex(int elemIndex) {
+    int index = this.curItemIndex;
+    while (elemIndex >= this.itemBeginPos[index]) {
+      index++;
+      if (index == this.itemBeginPos.length) {
+        throw new IndexOutOfBoundsException();
+      }
+    }
+    return index - 1;
+  }
+
+  /**
+   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the int value at the given index
+   */
+  public int getInt(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex] <= index
+        && this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndex(index);
+    }
+    return getInt(index, itemIndex);
+  }
+
+  @Override
+  public int getIntStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndexFromCurItemIndex(index);
+    }
+    return getInt(index, itemIndex);
+  }
+
+  /**
+   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the short value at the given index
+   */
+  public short getShort(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex] <= index
+        && this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndex(index);
+    }
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
+      return ByteBufferUtils.toShort(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a int. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available one byte from this item and remaining one from next
+    short n = 0;
+    n ^= item.get(offsetInItem) & 0xFF;
+    n <<= 8;
+    n ^= nextItem.get(0) & 0xFF;
+    return n;
+  }
+
+  @Override
+  public short getShortStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndexFromCurItemIndex(index);
+    }
+    return getShort(index, itemIndex);
+  }
+
+  private int getInt(int index, int itemIndex) {
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_INT) {
+      return ByteBufferUtils.toInt(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a int. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    int l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_INT - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  private short getShort(int index, int itemIndex) {
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_SHORT) {
+      return ByteBufferUtils.toShort(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a int. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    short l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  private long getLong(int index, int itemIndex) {
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_LONG) {
+      return ByteBufferUtils.toLong(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a long. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    long l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  /**
+   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the long value at the given index
+   */
+  public long getLong(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex] <= index
+        && this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndex(index);
+    }
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_LONG) {
+      return ByteBufferUtils.toLong(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a long. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    long l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  @Override
+  public long getLongStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndexFromCurItemIndex(index);
+    }
+    return getLong(index, itemIndex);
+  }
+
+  /**
+   * @return this MBB's current position
+   */
+  @Override
+  public int position() {
+    return itemBeginPos[this.curItemIndex] + this.curItem.position();
+  }
+
+  /**
+   * Sets this MBB's position to the given value.
+   * @param position
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff position(int position) {
+    // Short circuit for positioning within the cur item. Mostly that is the case.
+    if (this.itemBeginPos[this.curItemIndex] <= position
+        && this.itemBeginPos[this.curItemIndex + 1] > position) {
+      this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
+      return this;
+    }
+    int itemIndex = getItemIndex(position);
+    // All items from 0 - curItem-1 set position at end.
+    for (int i = 0; i < itemIndex; i++) {
+      this.items[i].position(this.items[i].limit());
+    }
+    // All items after curItem set position at begin
+    for (int i = itemIndex + 1; i < this.items.length; i++) {
+      this.items[i].position(0);
+    }
+    this.curItem = this.items[itemIndex];
+    this.curItem.position(position - this.itemBeginPos[itemIndex]);
+    this.curItemIndex = itemIndex;
+    return this;
+  }
+
+  /**
+   * Rewinds this MBB and the position is set to 0
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff rewind() {
+    for (int i = 0; i < this.items.length; i++) {
+      this.items[i].rewind();
+    }
+    this.curItemIndex = 0;
+    this.curItem = this.items[this.curItemIndex];
+    this.markedItemIndex = -1;
+    return this;
+  }
+
+  /**
+   * Marks the current position of the MBB
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff mark() {
+    this.markedItemIndex = this.curItemIndex;
+    this.curItem.mark();
+    return this;
+  }
+
+  /**
+   * Similar to {@link ByteBuffer}.reset(), ensures that this MBB
+   * is reset back to last marked position.
+   * @return This MBB
+   */
+  @Override
+  public MultiByteBuff reset() {
+    // when the buffer is moved to the next one.. the reset should happen on the previous marked
+    // item and the new one should be taken as the base
+    if (this.markedItemIndex < 0) throw new InvalidMarkException();
+    ByteBuffer markedItem = this.items[this.markedItemIndex];
+    markedItem.reset();
+    this.curItem = markedItem;
+    // All items after the marked position upto the current item should be reset to 0
+    for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
+      this.items[i].position(0);
+    }
+    this.curItemIndex = this.markedItemIndex;
+    return this;
+  }
+
+  /**
+   * Returns the number of elements between the current position and the
+   * limit.
+   * @return the remaining elements in this MBB
+   */
+  @Override
+  public int remaining() {
+    int remain = 0;
+    for (int i = curItemIndex; i < items.length; i++) {
+      remain += items[i].remaining();
+    }
+    return remain;
+  }
+
+  /**
+   * Returns true if there are elements between the current position and the limt
+   * @return true if there are elements, false otherwise
+   */
+  @Override
+  public final boolean hasRemaining() {
+    return this.curItem.hasRemaining() || this.curItemIndex < this.items.length - 1;
+  }
+
+  /**
+   * A relative method that returns byte at the current position.  Increments the
+   * current position by the size of a byte.
+   * @return the byte at the current position
+   */
+  @Override
+  public byte get() {
+    if (this.curItem.remaining() == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+    }
+    return this.curItem.get();
+  }
+
+  /**
+   * Returns the short value at the current position. Also advances the position by the size
+   * of short
+   *
+   * @return the short value at the current position
+   */
+  @Override
+  public short getShort() {
+    int remaining = this.curItem.remaining();
+    if (remaining >= Bytes.SIZEOF_SHORT) {
+      return this.curItem.getShort();
+    }
+    if (remaining == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      return this.curItem.getShort();
+    }
+    short n = 0;
+    n ^= get() & 0xFF;
+    n <<= 8;
+    n ^= get() & 0xFF;
+    return n;
+  }
+
+  /**
+   * Returns the int value at the current position. Also advances the position by the size of int
+   *
+   * @return the int value at the current position
+   */
+  @Override
+  public int getInt() {
+    int remaining = this.curItem.remaining();
+    if (remaining >= Bytes.SIZEOF_INT) {
+      return this.curItem.getInt();
+    }
+    if (remaining == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      return this.curItem.getInt();
+    }
+    // Get available bytes from this item and remaining from next
+    int n = 0;
+    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
+      n <<= 8;
+      n ^= get() & 0xFF;
+    }
+    return n;
+  }
+
+
+  /**
+   * Returns the long value at the current position. Also advances the position by the size of long
+   *
+   * @return the long value at the current position
+   */
+  @Override
+  public long getLong() {
+    int remaining = this.curItem.remaining();
+    if (remaining >= Bytes.SIZEOF_LONG) {
+      return this.curItem.getLong();
+    }
+    if (remaining == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      return this.curItem.getLong();
+    }
+    // Get available bytes from this item and remaining from next
+    long l = 0;
+    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
+      l <<= 8;
+      l ^= get() & 0xFF;
+    }
+    return l;
+  }
+
+  /**
+   * Copies the content from this MBB's current position to the byte array and fills it. Also
+   * advances the position of the MBB by the length of the byte[].
+   * @param dst
+   */
+  @Override
+  public void get(byte[] dst) {
+    get(dst, 0, dst.length);
+  }
+
+  /**
+   * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
+   * Also advances the position of the MBB by the given length.
+   * @param dst
+   * @param offset within the current array
+   * @param length upto which the bytes to be copied
+   */
+  @Override
+  public void get(byte[] dst, int offset, int length) {
+    while (length > 0) {
+      int toRead = Math.min(length, this.curItem.remaining());
+      ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
+          toRead);
+      this.curItem.position(this.curItem.position() + toRead);
+      length -= toRead;
+      if (length == 0)
+        break;
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      offset += toRead;
+    }
+  }
+
+  /**
+   * Marks the limit of this MBB.
+   * @param limit
+   * @return This MBB
+   */
+  @Override
+  public MultiByteBuff limit(int limit) {
+    this.limit = limit;
+    // Normally the limit will try to limit within the last BB item
+    int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
+    if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
+      this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
+      return this;
+    }
+    int itemIndex = getItemIndex(limit);
+    int beginOffset = this.itemBeginPos[itemIndex];
+    int offsetInItem = limit - beginOffset;
+    ByteBuffer item = items[itemIndex];
+    item.limit(offsetInItem);
+    for (int i = this.limitedItemIndex; i < itemIndex; i++) {
+      this.items[i].limit(this.items[i].capacity());
+    }
+    this.limitedItemIndex = itemIndex;
+    for (int i = itemIndex + 1; i < this.items.length; i++) {
+      this.items[i].limit(this.items[i].position());
+    }
+    return this;
+  }
+
+  /**
+   * Returns the limit of this MBB
+   * @return limit of the MBB
+   */
+  @Override
+  public int limit() {
+    return this.limit;
+  }
+
+  /**
+   * Returns an MBB which is a sliced version of this MBB. The position, limit and mark
+   * of the new MBB will be independent than that of the original MBB.
+   * The content of the new MBB will start at this MBB's current position
+   * @return a sliced MBB
+   */
+  @Override
+  public MultiByteBuff slice() {
+    ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
+    for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
+      copy[j] = this.items[i].slice();
+    }
+    return new MultiByteBuff(copy);
+  }
+
+  /**
+   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
+   * of the new MBB will be independent than that of the original MBB.
+   * The content of the new MBB will start at this MBB's current position
+   * The position, limit and mark of the new MBB would be identical to this MBB in terms of
+   * values.
+   * @return a sliced MBB
+   */
+  @Override
+  public MultiByteBuff duplicate() {
+    ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
+    for (int i = 0; i < this.items.length; i++) {
+      itemsCopy[i] = items[i].duplicate();
+    }
+    return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
+        this.curItemIndex, this.markedItemIndex);
+  }
+
+  /**
+   * Writes a byte to this MBB at the current position and increments the position
+   * @param b
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff put(byte b) {
+    if (this.curItem.remaining() == 0) {
+      if (this.curItemIndex == this.items.length - 1) {
+        throw new BufferOverflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+    }
+    this.curItem.put(b);
+    return this;
+  }
+
+  /**
+   * Writes a byte to this MBB at the given index
+   * @param index
+   * @param b
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff put(int index, byte b) {
+    int itemIndex = getItemIndex(limit);
+    ByteBuffer item = items[itemIndex];
+    item.put(index - itemBeginPos[itemIndex], b);
+    return this;
+  }
+
+  /**
+   * Copies from a src MBB to this MBB.
+   * @param offset the position in this MBB to which the copy should happen
+   * @param src the src MBB
+   * @param srcOffset the offset in the src MBB from where the elements should be read
+   * @param length the length upto which the copy should happen
+   */
+  @Override
+  public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+    int destItemIndex = getItemIndex(offset);
+    int srcItemIndex = getItemIndex(srcOffset);
+    ByteBuffer destItem = this.items[destItemIndex];
+    offset = offset - this.itemBeginPos[destItemIndex];
+
+    ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex);
+    srcOffset = srcOffset - this.itemBeginPos[srcItemIndex];
+    int toRead, toWrite, toMove;
+    while (length > 0) {
+      toWrite = destItem.limit() - offset;
+      toRead = srcItem.limit() - srcOffset;
+      toMove = Math.min(length, Math.min(toRead, toWrite));
+      ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove);
+      length -= toMove;
+      if (length == 0) break;
+      if (toRead < toWrite) {
+        srcItem = getItemByteBuffer(src, ++srcItemIndex);
+        srcOffset = 0;
+        offset += toMove;
+      } else if (toRead > toWrite) {
+        destItem = this.items[++destItemIndex];
+        offset = 0;
+        srcOffset += toMove;
+      } else {
+        // toRead = toWrite case
+        srcItem = getItemByteBuffer(src, ++srcItemIndex);
+        srcOffset = 0;
+        destItem = this.items[++destItemIndex];
+        offset = 0;
+      }
+    }
+    return this;
+  }
+
+  private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
+    return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
+        : ((MultiByteBuff) buf).items[index];
+  }
+
+  /**
+   * Writes an int to this MBB at its current position. Also advances the position by size of int
+   * @param val Int value to write
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff putInt(int val) {
+    if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
+      this.curItem.putInt(val);
+      return this;
+    }
+    if (this.curItemIndex == this.items.length - 1) {
+      throw new BufferOverflowException();
+    }
+    // During read, we will read as byte by byte for this case. So just write in Big endian
+    put(int3(val));
+    put(int2(val));
+    put(int1(val));
+    put(int0(val));
+    return this;
+  }
+
+  private static byte int3(int x) {
+    return (byte) (x >> 24);
+  }
+
+  private static byte int2(int x) {
+    return (byte) (x >> 16);
+  }
+
+  private static byte int1(int x) {
+    return (byte) (x >> 8);
+  }
+
+  private static byte int0(int x) {
+    return (byte) (x);
+  }
+
+  /**
+   * Copies from the given byte[] to this MBB
+   * @param src
+   * @return this MBB
+   */
+  @Override
+  public final MultiByteBuff put(byte[] src) {
+    return put(src, 0, src.length);
+  }
+
+  /**
+   * Copies from the given byte[] to this MBB
+   * @param src
+   * @param offset the position in the byte array from which the copy should be done
+   * @param length the length upto which the copy should happen
+   * @return this MBB
+   */
+  @Override
+  public MultiByteBuff put(byte[] src, int offset, int length) {
+    if (this.curItem.remaining() >= length) {
+      ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
+      return this;
+    }
+    int end = offset + length;
+    for (int i = offset; i < end; i++) {
+      this.put(src[i]);
+    }
+    return this;
+  }
+
+
+  /**
+   * Writes a long to this MBB at its current position. Also advances the position by size of long
+   * @param val Long value to write
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff putLong(long val) {
+    if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
+      this.curItem.putLong(val);
+      return this;
+    }
+    if (this.curItemIndex == this.items.length - 1) {
+      throw new BufferOverflowException();
+    }
+    // During read, we will read as byte by byte for this case. So just write in Big endian
+    put(long7(val));
+    put(long6(val));
+    put(long5(val));
+    put(long4(val));
+    put(long3(val));
+    put(long2(val));
+    put(long1(val));
+    put(long0(val));
+    return this;
+  }
+
+  private static byte long7(long x) {
+    return (byte) (x >> 56);
+  }
+
+  private static byte long6(long x) {
+    return (byte) (x >> 48);
+  }
+
+  private static byte long5(long x) {
+    return (byte) (x >> 40);
+  }
+
+  private static byte long4(long x) {
+    return (byte) (x >> 32);
+  }
+
+  private static byte long3(long x) {
+    return (byte) (x >> 24);
+  }
+
+  private static byte long2(long x) {
+    return (byte) (x >> 16);
+  }
+
+  private static byte long1(long x) {
+    return (byte) (x >> 8);
+  }
+
+  private static byte long0(long x) {
+    return (byte) (x);
+  }
+
+  /**
+   * Jumps the current position of this MBB by specified length.
+   * @param length
+   */
+  @Override
+  public MultiByteBuff skip(int length) {
+    // Get available bytes from this item and remaining from next
+    int jump = 0;
+    while (true) {
+      jump = this.curItem.remaining();
+      if (jump >= length) {
+        this.curItem.position(this.curItem.position() + length);
+        break;
+      }
+      this.curItem.position(this.curItem.position() + jump);
+      length -= jump;
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+    }
+    return this;
+  }
+
+  /**
+   * Jumps back the current position of this MBB by specified length.
+   * @param length
+   */
+  @Override
+  public MultiByteBuff moveBack(int length) {
+    while (length != 0) {
+      if (length > curItem.position()) {
+        length -= curItem.position();
+        this.curItem.position(0);
+        this.curItemIndex--;
+        this.curItem = this.items[curItemIndex];
+      } else {
+        this.curItem.position(curItem.position() - length);
+        break;
+      }
+    }
+    return this;
+  }
+
+ /**
+   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
+   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
+   * as such will be returned. So users are warned not to change the position or limit of this
+   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
+   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
+   * the bytes to a newly created ByteBuffer of required size and return that.
+   *
+   * @param length number of bytes required.
+   * @return bytes from current position till length specified, as a single ByteButter.
+   */
+  @Override
+  public ByteBuffer asSubByteBuffer(int length) {
+    if (this.curItem.remaining() >= length) {
+      return this.curItem;
+    }
+    int offset = 0;
+    byte[] dupB = new byte[length];
+    int locCurItemIndex = curItemIndex;
+    ByteBuffer locCurItem = curItem;
+    while (length > 0) {
+      int toRead = Math.min(length, locCurItem.remaining());
+      ByteBufferUtils
+          .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
+      length -= toRead;
+      if (length == 0)
+        break;
+      locCurItemIndex++;
+      locCurItem = this.items[locCurItemIndex];
+      offset += toRead;
+    }
+    return ByteBuffer.wrap(dupB);
+  }
+
+  /**
+   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
+   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
+   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
+   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
+   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
+   * ByteBuffer of required size and return that.
+   *
+   * @param offset the offset in this MBB from where the subBuffer should be created
+   * @param length the length of the subBuffer
+   * @param pair a pair that will have the bytes from the current position till length specified, as
+   *        a single ByteBuffer and offset in that Buffer where the bytes starts. The method would
+   *        set the values on the pair that is passed in by the caller
+   */
+  @Override
+  public void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair) {
+    if (this.itemBeginPos[this.curItemIndex] <= offset) {
+      int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
+      if (this.curItem.limit() - relOffsetInCurItem >= length) {
+        pair.setFirst(this.curItem);
+        pair.setSecond(relOffsetInCurItem);
+        return;
+      }
+    }
+    int itemIndex = getItemIndex(offset);
+    ByteBuffer item = this.items[itemIndex];
+    offset = offset - this.itemBeginPos[itemIndex];
+    if (item.limit() - offset >= length) {
+      pair.setFirst(item);
+      pair.setSecond(offset);
+      return;
+    }
+    byte[] dst = new byte[length];
+    int destOffset = 0;
+    while (length > 0) {
+      int toRead = Math.min(length, item.limit() - offset);
+      ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
+      length -= toRead;
+      if (length == 0) break;
+      itemIndex++;
+      item = this.items[itemIndex];
+      destOffset += toRead;
+      offset = 0;
+    }
+    pair.setFirst(ByteBuffer.wrap(dst));
+    pair.setSecond(0);
+    return;
+  }
+
+  /**
+   * Copies the content from an this MBB to a ByteBuffer
+   * @param out the ByteBuffer to which the copy has to happen
+   * @param sourceOffset the offset in the MBB from which the elements has
+   * to be copied
+   * @param length the length in the MBB upto which the elements has to be copied
+   */
+  @Override
+  public void get(ByteBuffer out, int sourceOffset,
+      int length) {
+      // Not used from real read path actually. So not going with
+      // optimization
+    for (int i = 0; i < length; ++i) {
+      out.put(this.get(sourceOffset + i));
+    }
+  }
+
+  /**
+   * Copy the content from this MBB to a byte[] based on the given offset and
+   * length
+   *
+   * @param offset
+   *          the position from where the copy should start
+   * @param length
+   *          the length upto which the copy has to be done
+   * @return byte[] with the copied contents from this MBB.
+   */
+  @Override
+  public byte[] toBytes(int offset, int length) {
+    byte[] output = new byte[length];
+    int itemIndex = getItemIndex(offset);
+    ByteBuffer item = this.items[itemIndex];
+    int toRead = item.limit() - offset;
+    int destinationOffset = 0;
+    while (length > 0) {
+      toRead = Math.min(length, toRead);
+      ByteBufferUtils.copyFromBufferToArray(output, item, offset, destinationOffset, toRead);
+      length -= toRead;
+      if (length == 0)
+        break;
+      destinationOffset += toRead;
+      offset = 0;
+      item = items[++itemIndex];
+      toRead = item.remaining();
+    }
+    return output;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof MultiByteBuff)) return false;
+    if (this == obj) return true;
+    MultiByteBuff that = (MultiByteBuff) obj;
+    if (this.capacity() != that.capacity()) return false;
+    if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
+        that.limit()) == 0) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 0;
+    for (ByteBuffer b : this.items) {
+      hash += b.hashCode();
+    }
+    return hash;
+  }
+}