You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/07/17 09:59:22 UTC

[1/4] hbase git commit: HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)

Repository: hbase
Updated Branches:
  refs/heads/master a249989b9 -> 834f87b23


http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
index 253dff8..429f630 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -194,7 +194,7 @@ public class TestHFileWriterV2 {
         assertFalse(block.isUnpacked());
         block = block.unpack(meta, blockReader);
       }
-      ByteBuffer buf = block.getBufferWithoutHeader();
+      ByteBuff buf = block.getBufferWithoutHeader();
       while (buf.hasRemaining()) {
         int keyLen = buf.getInt();
         int valueLen = buf.getInt();
@@ -241,7 +241,7 @@ public class TestHFileWriterV2 {
         .unpack(meta, blockReader);
       assertEquals(BlockType.META, block.getBlockType());
       Text t = new Text();
-      ByteBuffer buf = block.getBufferWithoutHeader();
+      ByteBuff buf = block.getBufferWithoutHeader();
       if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
         throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index 9adeaca..979c9f6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -220,7 +220,7 @@ public class TestHFileWriterV3 {
       HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false)
         .unpack(context, blockReader);
       assertEquals(BlockType.DATA, block.getBlockType());
-      ByteBuffer buf = block.getBufferWithoutHeader();
+      ByteBuff buf = block.getBufferWithoutHeader();
       int keyLen = -1;
       while (buf.hasRemaining()) {
 
@@ -282,7 +282,7 @@ public class TestHFileWriterV3 {
         .unpack(context, blockReader);
       assertEquals(BlockType.META, block.getBlockType());
       Text t = new Text();
-      ByteBuffer buf = block.getBufferWithoutHeader();
+      ByteBuff buf = block.getBufferWithoutHeader();
       if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) {
         throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index 511f942..f68271e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
@@ -73,4 +75,47 @@ public class TestByteBufferIOEngine {
     assert testOffsetAtStartNum == 0;
     assert testOffsetAtEndNum == 0;
   }
+
+  @Test
+  public void testByteBufferIOEngineWithMBB() throws Exception {
+    int capacity = 32 * 1024 * 1024; // 32 MB
+    int testNum = 100;
+    int maxBlockSize = 64 * 1024;
+    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, false);
+    int testOffsetAtStartNum = testNum / 10;
+    int testOffsetAtEndNum = testNum / 10;
+    for (int i = 0; i < testNum; i++) {
+      byte val = (byte) (Math.random() * 255);
+      int blockSize = (int) (Math.random() * maxBlockSize);
+      if (blockSize == 0) {
+        blockSize = 1;
+      }
+      byte[] byteArray = new byte[blockSize];
+      for (int j = 0; j < byteArray.length; ++j) {
+        byteArray[j] = val;
+      }
+      ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
+      int offset = 0;
+      if (testOffsetAtStartNum > 0) {
+        testOffsetAtStartNum--;
+        offset = 0;
+      } else if (testOffsetAtEndNum > 0) {
+        testOffsetAtEndNum--;
+        offset = capacity - blockSize;
+      } else {
+        offset = (int) (Math.random() * (capacity - maxBlockSize));
+      }
+      ioEngine.write(srcBuffer, offset);
+      //ByteBuffer dstBuffer = ByteBuffer.allocate(blockSize);
+      //ioEngine.read(dstBuffer, offset);
+      //MultiByteBuffer read = new MultiByteBuffer(dstBuffer);
+      // TODO : this will get changed after HBASE-12295 goes in
+      ByteBuff read = ioEngine.read(offset, blockSize);
+      for (int j = 0; j < byteArray.length; ++j) {
+        assertTrue(srcBuffer.get(j) == read.get(j));
+      }
+    }
+    assert testOffsetAtStartNum == 0;
+    assert testOffsetAtEndNum == 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java
index 4d8ad4b..dd46119 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java
@@ -24,6 +24,8 @@ import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 
 import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.experimental.categories.Category;
@@ -44,14 +46,14 @@ public class TestBloomFilterChunk extends TestCase {
     bf1.add(key1);
     bf2.add(key2);
 
-    assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, bf1.bloom, 0, (int) bf1.byteSize,
-        bf1.hash, bf1.hashCount));
-    assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, bf1.bloom, 0, (int) bf1.byteSize,
-        bf1.hash, bf1.hashCount));
-    assertFalse(BloomFilterUtil.contains(key1, 0, key1.length, bf2.bloom, 0, (int) bf2.byteSize,
-        bf2.hash, bf2.hashCount));
-    assertTrue(BloomFilterUtil.contains(key2, 0, key2.length, bf2.bloom, 0, (int) bf2.byteSize,
-        bf2.hash, bf2.hashCount));
+    assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, new MultiByteBuff(bf1.bloom), 0,
+        (int) bf1.byteSize, bf1.hash, bf1.hashCount));
+    assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, new MultiByteBuff(bf1.bloom), 0,
+        (int) bf1.byteSize, bf1.hash, bf1.hashCount));
+    assertFalse(BloomFilterUtil.contains(key1, 0, key1.length, new MultiByteBuff(bf2.bloom), 0,
+        (int) bf2.byteSize, bf2.hash, bf2.hashCount));
+    assertTrue(BloomFilterUtil.contains(key2, 0, key2.length, new MultiByteBuff(bf2.bloom), 0,
+        (int) bf2.byteSize, bf2.hash, bf2.hashCount));
 
     byte [] bkey = {1,2,3,4};
     byte [] bval = "this is a much larger byte array".getBytes();
@@ -59,12 +61,12 @@ public class TestBloomFilterChunk extends TestCase {
     bf1.add(bkey);
     bf1.add(bval, 1, bval.length-1);
 
-    assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, bf1.bloom, 0, (int) bf1.byteSize,
-        bf1.hash, bf1.hashCount));
-    assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, bf1.bloom, 0, (int) bf1.byteSize,
-        bf1.hash, bf1.hashCount));
-    assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, bf1.bloom, 0, (int) bf1.byteSize,
-        bf1.hash, bf1.hashCount));
+    assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, new MultiByteBuff(bf1.bloom), 0,
+        (int) bf1.byteSize, bf1.hash, bf1.hashCount));
+    assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, new MultiByteBuff(bf1.bloom),
+        0, (int) bf1.byteSize, bf1.hash, bf1.hashCount));
+    assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, new MultiByteBuff(bf1.bloom), 0,
+        (int) bf1.byteSize, bf1.hash, bf1.hashCount));
 
     // test 2: serialization & deserialization.
     // (convert bloom to byte array & read byte array back in as input)
@@ -73,18 +75,18 @@ public class TestBloomFilterChunk extends TestCase {
     ByteBuffer bb = ByteBuffer.wrap(bOut.toByteArray());
     BloomFilterChunk newBf1 = new BloomFilterChunk(1000, (float)0.01,
         Hash.MURMUR_HASH, 0);
-    assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, bb, 0, (int) newBf1.byteSize,
-        newBf1.hash, newBf1.hashCount));
-    assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, bb, 0, (int) newBf1.byteSize,
-        newBf1.hash, newBf1.hashCount));
-    assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, bb, 0, (int) newBf1.byteSize,
-        newBf1.hash, newBf1.hashCount));
-    assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, bb, 0, (int) newBf1.byteSize,
-        newBf1.hash, newBf1.hashCount));
-    assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, bb, 0, (int) newBf1.byteSize,
-        newBf1.hash, newBf1.hashCount));
-    assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, bb, 0, (int) newBf1.byteSize,
-        newBf1.hash, newBf1.hashCount));
+    assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, new MultiByteBuff(bb), 0,
+        (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount));
+    assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, new MultiByteBuff(bb), 0,
+        (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount));
+    assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, new MultiByteBuff(bb), 0,
+        (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount));
+    assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, new MultiByteBuff(bb), 0,
+        (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount));
+    assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, new MultiByteBuff(bb), 0,
+        (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount));
+    assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, new MultiByteBuff(bb), 0,
+        (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount));
 
     System.out.println("Serialized as " + bOut.size() + " bytes");
     assertTrue(bOut.size() - bf1.byteSize < 10); //... allow small padding
@@ -105,9 +107,10 @@ public class TestBloomFilterChunk extends TestCase {
     int falsePositives = 0;
     for (int i = 0; i < 25; ++i) {
       byte[] bytes = Bytes.toBytes(i);
-      if (BloomFilterUtil.contains(bytes, 0, bytes.length, b.bloom, 0, (int) b.byteSize, b.hash,
-          b.hashCount)) {
-        if(i >= 12) falsePositives++;
+      if (BloomFilterUtil.contains(bytes, 0, bytes.length, new MultiByteBuff(b.bloom), 0,
+          (int) b.byteSize, b.hash, b.hashCount)) {
+        if (i >= 12)
+          falsePositives++;
       } else {
         assertFalse(i < 12);
       }
@@ -143,9 +146,10 @@ public class TestBloomFilterChunk extends TestCase {
     for (int i = 0; i < 2*1000*1000; ++i) {
 
       byte[] bytes = Bytes.toBytes(i);
-      if (BloomFilterUtil.contains(bytes, 0, bytes.length, b.bloom, 0, (int) b.byteSize, b.hash,
-          b.hashCount)) {
-        if(i >= 1*1000*1000) falsePositives++;
+      if (BloomFilterUtil.contains(bytes, 0, bytes.length, new MultiByteBuff(b.bloom), 0,
+          (int) b.byteSize, b.hash, b.hashCount)) {
+        if (i >= 1 * 1000 * 1000)
+          falsePositives++;
       } else {
         assertFalse(i < 1*1000*1000);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBuffUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBuffUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBuffUtils.java
new file mode 100644
index 0000000..4c6990e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBuffUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestByteBuffUtils {
+  @Test
+  public void testCopyAndCompare() throws Exception {
+    ByteBuffer bb1 = ByteBuffer.allocate(50);
+    ByteBuffer bb2 = ByteBuffer.allocate(50);
+    MultiByteBuff src = new MultiByteBuff(bb1, bb2);
+    for (int i = 0; i < 7; i++) {
+      src.putLong(8l);
+    }
+    src.put((byte) 1);
+    src.put((byte) 1);
+    ByteBuffer bb3 = ByteBuffer.allocate(50);
+    ByteBuffer bb4 = ByteBuffer.allocate(50);
+    MultiByteBuff mbbDst = new MultiByteBuff(bb3, bb4);
+    // copy from MBB to MBB
+    mbbDst.put(0, src, 0, 100);
+    int compareTo = ByteBuff.compareTo(src, 0, 100, mbbDst, 0, 100);
+    assertTrue(compareTo == 0);
+    // Copy from MBB to SBB
+    bb3 = ByteBuffer.allocate(100);
+    SingleByteBuff sbbDst = new SingleByteBuff(bb3);
+    src.rewind();
+    sbbDst.put(0, src, 0, 100);
+    compareTo = ByteBuff.compareTo(src, 0, 100, sbbDst, 0, 100);
+    assertTrue(compareTo == 0);
+    // Copy from SBB to SBB
+    bb3 = ByteBuffer.allocate(100);
+    SingleByteBuff sbb = new SingleByteBuff(bb3);
+    for (int i = 0; i < 7; i++) {
+      sbb.putLong(8l);
+    }
+    sbb.put((byte) 1);
+    sbb.put((byte) 1);
+    bb4 = ByteBuffer.allocate(100);
+    sbbDst = new SingleByteBuff(bb4);
+    sbbDst.put(0, sbb, 0, 100);
+    compareTo = ByteBuff.compareTo(sbb, 0, 100, sbbDst, 0, 100);
+    assertTrue(compareTo == 0);
+    // copy from SBB to MBB
+    sbb.rewind();
+    mbbDst = new MultiByteBuff(bb3, bb4);
+    mbbDst.rewind();
+    mbbDst.put(0, sbb, 0, 100);
+    compareTo = ByteBuff.compareTo(sbb, 0, 100, mbbDst, 0, 100);
+    assertTrue(compareTo == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
index 547c046..2403c82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
@@ -194,7 +194,7 @@ public class TestByteBufferUtils {
     ByteBuffer dstBuffer = ByteBuffer.allocate(array.length);
     srcBuffer.put(array);
 
-    ByteBufferUtils.copyFromBufferToBuffer(dstBuffer, srcBuffer,
+    ByteBufferUtils.copyFromBufferToBuffer(srcBuffer, dstBuffer,
         array.length / 2, array.length / 4);
     for (int i = 0; i < array.length / 4; ++i) {
       assertEquals(srcBuffer.get(i + array.length / 2),


[3/4] hbase git commit: HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java
deleted file mode 100644
index 1b4ad2a..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java
+++ /dev/null
@@ -1,1047 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.nio;
-
-import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.InvalidMarkException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
- * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
- * short, long etc and doing operations like mark, reset, slice etc. This has to be used when
- * data is split across multiple byte buffers and we don't want copy them to single buffer
- * for reading from it.
- */
-@InterfaceAudience.Private
-public class MultiByteBuffer {
-
-  private final ByteBuffer[] items;
-  // Pointer to the current item in the MBB
-  private ByteBuffer curItem = null;
-  // Index of the current item in the MBB
-  private int curItemIndex = 0;
-  /**
-   * An indicator that helps in short circuiting some of the APIs functionality
-   * if the MBB is backed by single item
-   */
-  private final boolean singleItem;
-  private int limit = 0;
-  private int limitedItemIndex;
-  private int markedItemIndex = -1;
-  private final int[] itemBeginPos;
-
-  public MultiByteBuffer(ByteBuffer... items) {
-    assert items != null;
-    assert items.length > 0;
-    this.items = items;
-    this.curItem = this.items[this.curItemIndex];
-    this.singleItem = items.length == 1;
-    // See below optimization in getInt(int) where we check whether the given index land in current
-    // item. For this we need to check whether the passed index is less than the next item begin
-    // offset. To handle this effectively for the last item buffer, we add an extra item into this
-    // array.
-    itemBeginPos = new int[items.length + 1];
-    int offset = 0;
-    for (int i = 0; i < items.length; i++) {
-      ByteBuffer item = items[i];
-      item.rewind();
-      itemBeginPos[i] = offset;
-      int l = item.limit() - item.position();
-      offset += l;
-    }
-    this.limit = offset;
-    this.itemBeginPos[items.length] = offset + 1;
-    this.limitedItemIndex = this.items.length - 1;
-  }
-
-  private MultiByteBuffer(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
-      int curItemIndex, int markedIndex) {
-    this.items = items;
-    this.curItemIndex = curItemIndex;
-    this.curItem = this.items[this.curItemIndex];
-    this.singleItem = items.length == 1;
-    this.itemBeginPos = itemBeginPos;
-    this.limit = limit;
-    this.limitedItemIndex = limitedIndex;
-    this.markedItemIndex = markedIndex;
-  }
-
-  /**
-   * @return the underlying array if this MultiByteBuffer is made up of single on heap ByteBuffer.
-   * @throws UnsupportedOperationException - if the MBB is not made up of single item
-   * or if the single item is a Direct Byte Buffer
-   */
-  public byte[] array() {
-    if (hasArray()) {
-      return this.curItem.array();
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * @return the array offset of the item ByteBuffer if the MBB is made up of
-   * single on heap ByteBuffer
-   * @throws UnsupportedOperationException if the MBB is not made up of single item or
-   * the single item is a Direct byte Buffer
-   */
-  public int arrayOffset() {
-    if (hasArray()) {
-      return this.curItem.arrayOffset();
-    }
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * @return true if the MBB is made up of single item and that single item is an
-   * on heap Byte Buffer
-   */
-  public boolean hasArray() {
-    return this.singleItem && this.curItem.hasArray();
-  }
-
-  /**
-   * @return the total capacity of this MultiByteBuffer.
-   */
-  public int capacity() {
-    int c = 0;
-    for (ByteBuffer item : this.items) {
-      c += item.capacity();
-    }
-    return c;
-  }
-
-  /**
-   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
-   * @param index
-   * @return the byte at the given index
-   */
-  public byte get(int index) {
-    if (singleItem) {
-      return this.curItem.get(index);
-    }
-    int itemIndex = getItemIndex(index);
-    return this.items[itemIndex].get(index - this.itemBeginPos[itemIndex]);
-  }
-
-  /*
-   * Returns in which sub ByteBuffer, the given element index will be available.
-   */
-  private int getItemIndex(int elemIndex) {
-    int index = 1;
-    while (elemIndex >= this.itemBeginPos[index]) {
-      index++;
-      if (index == this.itemBeginPos.length) {
-        throw new IndexOutOfBoundsException();
-      }
-    }
-    return index - 1;
-  }
-
-  /*
-   * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
-   * sure that the item will be after MBB's current position
-   */
-  private int getItemIndexFromCurItemIndex(int elemIndex) {
-    int index = this.curItemIndex;
-    while (elemIndex >= this.itemBeginPos[index]) {
-      index++;
-      if (index == this.itemBeginPos.length) {
-        throw new IndexOutOfBoundsException();
-      }
-    }
-    return index - 1;
-  }
-
-  /**
-   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers
-   * @param index
-   * @return the short value at the given index
-   */
-  public short getShort(int index) {
-    if (singleItem) {
-      return ByteBufferUtils.toShort(curItem, index);
-    }
-    // Mostly the index specified will land within this current item. Short circuit for that
-    int itemIndex;
-    if (this.itemBeginPos[this.curItemIndex] <= index
-        && this.itemBeginPos[this.curItemIndex + 1] > index) {
-      itemIndex = this.curItemIndex;
-    } else {
-      itemIndex = getItemIndex(index);
-    }
-    ByteBuffer item = items[itemIndex];
-    int offsetInItem = index - this.itemBeginPos[itemIndex];
-    if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
-      return ByteBufferUtils.toShort(item, offsetInItem);
-    }
-    if (items.length - 1 == itemIndex) {
-      // means cur item is the last one and we wont be able to read a int. Throw exception
-      throw new BufferUnderflowException();
-    }
-    ByteBuffer nextItem = items[itemIndex + 1];
-    // Get available one byte from this item and remaining one from next
-    short n = 0;
-    n ^= item.get(offsetInItem) & 0xFF;
-    n <<= 8;
-    n ^= nextItem.get(0) & 0xFF;
-    return n;
-  }
-
-  /**
-   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers
-   * @param index
-   * @return the int value at the given index
-   */
-  public int getInt(int index) {
-    if (singleItem) {
-      return ByteBufferUtils.toInt(this.curItem, index);
-    }
-    // Mostly the index specified will land within this current item. Short circuit for that
-    int itemIndex;
-    if (this.itemBeginPos[this.curItemIndex] <= index
-        && this.itemBeginPos[this.curItemIndex + 1] > index) {
-      itemIndex = this.curItemIndex;
-    } else {
-      itemIndex = getItemIndex(index);
-    }
-    return getInt(index, itemIndex);
-  }
-
-  /**
-   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. The
-   * difference for this API from {@link #getInt(int)} is the caller is sure that the index will be
-   * after the current position of this MBB.
-   *
-   * @param index
-   * @return the int value at the given index
-   */
-  public int getIntStrictlyForward(int index) {
-    if (singleItem) {
-      return ByteBufferUtils.toInt(this.curItem, index);
-    }
-    // Mostly the index specified will land within this current item. Short circuit for that
-    int itemIndex;
-    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
-      itemIndex = this.curItemIndex;
-    } else {
-      itemIndex = getItemIndexFromCurItemIndex(index);
-    }
-    return getInt(index, itemIndex);
-  }
-
-  private int getInt(int index, int itemIndex) {
-    ByteBuffer item = items[itemIndex];
-    int offsetInItem = index - this.itemBeginPos[itemIndex];
-    int remainingLen = item.limit() - offsetInItem;
-    if (remainingLen >= Bytes.SIZEOF_INT) {
-      return ByteBufferUtils.toInt(item, offsetInItem);
-    }
-    if (items.length - 1 == itemIndex) {
-      // means cur item is the last one and we wont be able to read a int. Throw exception
-      throw new BufferUnderflowException();
-    }
-    ByteBuffer nextItem = items[itemIndex + 1];
-    // Get available bytes from this item and remaining from next
-    int l = 0;
-    for (int i = offsetInItem; i < item.capacity(); i++) {
-      l <<= 8;
-      l ^= item.get(i) & 0xFF;
-    }
-    for (int i = 0; i < Bytes.SIZEOF_INT - remainingLen; i++) {
-      l <<= 8;
-      l ^= nextItem.get(i) & 0xFF;
-    }
-    return l;
-  }
-
-  /**
-   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers
-   * @param index
-   * @return the long value at the given index
-   */
-  public long getLong(int index) {
-    if (singleItem) {
-      return this.curItem.getLong(index);
-    }
-    // Mostly the index specified will land within this current item. Short circuit for that
-    int itemIndex;
-    if (this.itemBeginPos[this.curItemIndex] <= index
-        && this.itemBeginPos[this.curItemIndex + 1] > index) {
-      itemIndex = this.curItemIndex;
-    } else {
-      itemIndex = getItemIndex(index);
-    }
-    ByteBuffer item = items[itemIndex];
-    int offsetInItem = index - this.itemBeginPos[itemIndex];
-    int remainingLen = item.limit() - offsetInItem;
-    if (remainingLen >= Bytes.SIZEOF_LONG) {
-      return ByteBufferUtils.toLong(item, offsetInItem);
-    }
-    if (items.length - 1 == itemIndex) {
-      // means cur item is the last one and we wont be able to read a long. Throw exception
-      throw new BufferUnderflowException();
-    }
-    ByteBuffer nextItem = items[itemIndex + 1];
-    // Get available bytes from this item and remaining from next
-    long l = 0;
-    for (int i = offsetInItem; i < item.capacity(); i++) {
-      l <<= 8;
-      l ^= item.get(i) & 0xFF;
-    }
-    for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) {
-      l <<= 8;
-      l ^= nextItem.get(i) & 0xFF;
-    }
-    return l;
-  }
-
-  /**
-   * @return this MBB's current position
-   */
-  public int position() {
-    if (this.singleItem) return this.curItem.position();
-    return itemBeginPos[this.curItemIndex] + this.curItem.position();
-  }
-
-  /**
-   * Sets this MBB's position to the given value.
-   * @param position
-   * @return this object
-   */
-  public MultiByteBuffer position(int position) {
-    if (this.singleItem) {
-      this.curItem.position(position);
-      return this;
-    }
-    // Short circuit for positioning within the cur item. Mostly that is the case.
-    if (this.itemBeginPos[this.curItemIndex] <= position
-        && this.itemBeginPos[this.curItemIndex + 1] > position) {
-      this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
-      return this;
-    }
-    int itemIndex = getItemIndex(position);
-    // All items from 0 - curItem-1 set position at end.
-    for (int i = 0; i < itemIndex; i++) {
-      this.items[i].position(this.items[i].limit());
-    }
-    // All items after curItem set position at begin
-    for (int i = itemIndex + 1; i < this.items.length; i++) {
-      this.items[i].position(0);
-    }
-    this.curItem = this.items[itemIndex];
-    this.curItem.position(position - this.itemBeginPos[itemIndex]);
-    this.curItemIndex = itemIndex;
-    return this;
-  }
-
-  /**
-   * Rewinds this MBB and the position is set to 0
-   * @return this object
-   */
-  public MultiByteBuffer rewind() {
-    for (int i = 0; i < this.items.length; i++) {
-      this.items[i].rewind();
-    }
-    this.curItemIndex = 0;
-    this.curItem = this.items[this.curItemIndex];
-    this.markedItemIndex = -1;
-    return this;
-  }
-
-  /**
-   * Marks the current position of the MBB
-   * @return this object
-   */
-  public MultiByteBuffer mark() {
-    this.markedItemIndex = this.curItemIndex;
-    this.curItem.mark();
-    return this;
-  }
-
-  /**
-   * Similar to {@link ByteBuffer}.reset(), ensures that this MBB
-   * is reset back to last marked position.
-   * @return This MBB
-   */
-  public MultiByteBuffer reset() {
-    // when the buffer is moved to the next one.. the reset should happen on the previous marked
-    // item and the new one should be taken as the base
-    if (this.markedItemIndex < 0) throw new InvalidMarkException();
-    ByteBuffer markedItem = this.items[this.markedItemIndex];
-    markedItem.reset();
-    this.curItem = markedItem;
-    // All items after the marked position upto the current item should be reset to 0
-    for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
-      this.items[i].position(0);
-    }
-    this.curItemIndex = this.markedItemIndex;
-    return this;
-  }
-
-  /**
-   * Returns the number of elements between the current position and the
-   * limit.
-   * @return the remaining elements in this MBB
-   */
-  public int remaining() {
-    int remain = 0;
-    for (int i = curItemIndex; i < items.length; i++) {
-      remain += items[i].remaining();
-    }
-    return remain;
-  }
-
-  /**
-   * Returns true if there are elements between the current position and the limt
-   * @return true if there are elements, false otherwise
-   */
-  public final boolean hasRemaining() {
-    return this.curItem.hasRemaining() || this.curItemIndex < this.items.length - 1;
-  }
-
-  /**
-   * A relative method that returns byte at the current position.  Increments the
-   * current position by the size of a byte.
-   * @return the byte at the current position
-   */
-  public byte get() {
-    if (!singleItem && this.curItem.remaining() == 0) {
-      if (items.length - 1 == this.curItemIndex) {
-        // means cur item is the last one and we wont be able to read a long. Throw exception
-        throw new BufferUnderflowException();
-      }
-      this.curItemIndex++;
-      this.curItem = this.items[this.curItemIndex];
-    }
-    return this.curItem.get();
-  }
-
-  /**
-   * Returns the short value at the current position. Also advances the position by the size
-   * of short
-   *
-   * @return the short value at the current position
-   */
-  public short getShort() {
-    if (singleItem) {
-      return this.curItem.getShort();
-    }
-    int remaining = this.curItem.remaining();
-    if (remaining >= Bytes.SIZEOF_SHORT) {
-      return this.curItem.getShort();
-    }
-    if (remaining == 0) {
-      if (items.length - 1 == this.curItemIndex) {
-        // means cur item is the last one and we wont be able to read a long. Throw exception
-        throw new BufferUnderflowException();
-      }
-      this.curItemIndex++;
-      this.curItem = this.items[this.curItemIndex];
-      return this.curItem.getShort();
-    }
-    short n = 0;
-    n ^= get() & 0xFF;
-    n <<= 8;
-    n ^= get() & 0xFF;
-    return n;
-  }
-
-  /**
-   * Returns the int value at the current position. Also advances the position by the size of int
-   *
-   * @return the int value at the current position
-   */
-  public int getInt() {
-    if (singleItem) {
-      return this.curItem.getInt();
-    }
-    int remaining = this.curItem.remaining();
-    if (remaining >= Bytes.SIZEOF_INT) {
-      return this.curItem.getInt();
-    }
-    if (remaining == 0) {
-      if (items.length - 1 == this.curItemIndex) {
-        // means cur item is the last one and we wont be able to read a long. Throw exception
-        throw new BufferUnderflowException();
-      }
-      this.curItemIndex++;
-      this.curItem = this.items[this.curItemIndex];
-      return this.curItem.getInt();
-    }
-    // Get available bytes from this item and remaining from next
-    int n = 0;
-    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
-      n <<= 8;
-      n ^= get() & 0xFF;
-    }
-    return n;
-  }
-
-
-  /**
-   * Returns the long value at the current position. Also advances the position by the size of long
-   *
-   * @return the long value at the current position
-   */
-  public long getLong() {
-    if (singleItem) {
-      return this.curItem.getLong();
-    }
-    int remaining = this.curItem.remaining();
-    if (remaining >= Bytes.SIZEOF_LONG) {
-      return this.curItem.getLong();
-    }
-    if (remaining == 0) {
-      if (items.length - 1 == this.curItemIndex) {
-        // means cur item is the last one and we wont be able to read a long. Throw exception
-        throw new BufferUnderflowException();
-      }
-      this.curItemIndex++;
-      this.curItem = this.items[this.curItemIndex];
-      return this.curItem.getLong();
-    }
-    // Get available bytes from this item and remaining from next
-    long l = 0;
-    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
-      l <<= 8;
-      l ^= get() & 0xFF;
-    }
-    return l;
-  }
-
-  /**
-   * Returns the long value, stored as variable long at the current position of this
-   * MultiByteBuffer. Also advances it's position accordingly.
-   * This is similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
-   * {@link MultiByteBuffer}
-   *
-   * @return the long value at the current position
-   */
-  public long getVLong() {
-    byte firstByte = get();
-    int len = WritableUtils.decodeVIntSize(firstByte);
-    if (len == 1) {
-      return firstByte;
-    }
-    long i = 0;
-    byte b;
-    for (int idx = 0; idx < len - 1; idx++) {
-      b = get();
-      i = i << 8;
-      i = i | (b & 0xFF);
-    }
-    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
-  }
-
-  /**
-   * Copies the content from this MBB's current position to the byte array and fills it. Also
-   * advances the position of the MBB by the length of the byte[].
-   * @param dst
-   * @return this object
-   */
-  public MultiByteBuffer get(byte[] dst) {
-    return get(dst, 0, dst.length);
-  }
-
-  /**
-   * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
-   * Also advances the position of the MBB by the given length.
-   * @param dst
-   * @param offset within the current array
-   * @param length upto which the bytes to be copied
-   * @return this object
-   */
-  public MultiByteBuffer get(byte[] dst, int offset, int length) {
-    if (this.singleItem) {
-      this.curItem.get(dst, offset, length);
-    } else {
-      while (length > 0) {
-        int toRead = Math.min(length, this.curItem.remaining());
-        this.curItem.get(dst, offset, toRead);
-        length -= toRead;
-        if (length == 0)
-          break;
-        this.curItemIndex++;
-        this.curItem = this.items[this.curItemIndex];
-        offset += toRead;
-      }
-    }
-    return this;
-  }
-
-  /**
-   * Marks the limit of this MBB.
-   * @param limit
-   * @return This MBB
-   */
-  public MultiByteBuffer limit(int limit) {
-    this.limit = limit;
-    if (singleItem) {
-      this.curItem.limit(limit);
-      return this;
-    }
-    // Normally the limit will try to limit within the last BB item
-    int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
-    if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
-      this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
-      return this;
-    }
-    int itemIndex = getItemIndex(limit);
-    int beginOffset = this.itemBeginPos[itemIndex];
-    int offsetInItem = limit - beginOffset;
-    ByteBuffer item = items[itemIndex];
-    item.limit(offsetInItem);
-    for (int i = this.limitedItemIndex; i < itemIndex; i++) {
-      this.items[i].limit(this.items[i].capacity());
-    }
-    this.limitedItemIndex = itemIndex;
-    for (int i = itemIndex + 1; i < this.items.length; i++) {
-      this.items[i].limit(this.items[i].position());
-    }
-    return this;
-  }
-
-  /**
-   * Returns the limit of this MBB
-   * @return limit of the MBB
-   */
-  public int limit() {
-    return this.limit;
-  }
-
-  /**
-   * Returns an MBB which is a sliced version of this MBB. The position, limit and mark
-   * of the new MBB will be independent than that of the original MBB.
-   * The content of the new MBB will start at this MBB's current position
-   * @return a sliced MBB
-   */
-  public MultiByteBuffer slice() {
-    if (this.singleItem) {
-      return new MultiByteBuffer(curItem.slice());
-    }
-    ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
-    for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
-      copy[j] = this.items[i].slice();
-    }
-    return new MultiByteBuffer(copy);
-  }
-
-  /**
-   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
-   * of the new MBB will be independent than that of the original MBB.
-   * The content of the new MBB will start at this MBB's current position
-   * The position, limit and mark of the new MBB would be identical to this MBB in terms of
-   * values.
-   * @return a sliced MBB
-   */
-  public MultiByteBuffer duplicate() {
-    if (this.singleItem) {
-      return new MultiByteBuffer(new ByteBuffer[] { curItem.duplicate() }, this.itemBeginPos,
-          this.limit, this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
-    }
-    ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
-    for (int i = 0; i < this.items.length; i++) {
-      itemsCopy[i] = items[i].duplicate();
-    }
-    return new MultiByteBuffer(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
-        this.curItemIndex, this.markedItemIndex);
-  }
-
-  /**
-   * Writes a byte to this MBB at the current position and increments the position
-   * @param b
-   * @return this object
-   */
-  public MultiByteBuffer put(byte b) {
-    if (!singleItem && this.curItem.remaining() == 0) {
-      if (this.curItemIndex == this.items.length - 1) {
-        throw new BufferOverflowException();
-      }
-      this.curItemIndex++;
-      this.curItem = this.items[this.curItemIndex];
-    }
-    this.curItem.put(b);
-    return this;
-  }
-
-  /**
-   * Writes a byte to this MBB at the given index
-   * @param index
-   * @param b
-   * @return this object
-   */
-  public MultiByteBuffer put(int index, byte b) {
-    if (this.singleItem) {
-      this.curItem.put(index, b);
-      return this;
-    }
-    int itemIndex = getItemIndex(limit);
-    ByteBuffer item = items[itemIndex];
-    item.put(index - itemBeginPos[itemIndex], b);
-    return this;
-  }
-
-  /**
-   * Copies from a src MBB to this MBB.
-   * @param offset the position in this MBB to which the copy should happen
-   * @param src the src MBB
-   * @param srcOffset the offset in the src MBB from where the elements should be read
-   * @param length the length upto which the copy should happen
-   */
-  public void put(int offset, MultiByteBuffer src, int srcOffset, int length) {
-    if (src.hasArray() && this.hasArray()) {
-      System.arraycopy(src.array(), srcOffset + src.arrayOffset(), this.array(), this.arrayOffset()
-          + offset, length);
-    } else {
-      int destItemIndex = getItemIndex(offset);
-      int srcItemIndex = getItemIndex(srcOffset);
-      ByteBuffer destItem = this.items[destItemIndex];
-      offset = offset - this.itemBeginPos[destItemIndex];
-
-      ByteBuffer srcItem = src.items[srcItemIndex];
-      srcOffset = srcOffset - this.itemBeginPos[srcItemIndex];
-      int toRead, toWrite, toMove;
-      while (length > 0) {
-        toWrite = destItem.limit() - offset;
-        toRead = srcItem.limit() - srcOffset;
-        toMove = Math.min(length, Math.min(toRead, toWrite));
-        ByteBufferUtils.copyFromBufferToBuffer(destItem, srcItem, srcOffset, offset, toMove);
-        length -= toMove;
-        if (length == 0) break;
-        if (toRead < toWrite) {
-          srcItem = src.items[++srcItemIndex];
-          srcOffset = 0;
-          offset += toMove;
-        } else if (toRead > toWrite) {
-          destItem = this.items[++destItemIndex];
-          offset = 0;
-          srcOffset += toMove;
-        } else {
-          // toRead = toWrite case
-          srcItem = src.items[++srcItemIndex];
-          srcOffset = 0;
-          destItem = this.items[++destItemIndex];
-          offset = 0;
-        }
-      }
-    }
-  }
-
-  /**
-   * Writes an int to this MBB at its current position. Also advances the position by size of int
-   * @param val Int value to write
-   * @return this object
-   */
-  public MultiByteBuffer putInt(int val) {
-    if (singleItem || this.curItem.remaining() >= Bytes.SIZEOF_INT) {
-      this.curItem.putInt(val);
-      return this;
-    }
-    if (this.curItemIndex == this.items.length - 1) {
-      throw new BufferOverflowException();
-    }
-    // During read, we will read as byte by byte for this case. So just write in Big endian
-    put(int3(val));
-    put(int2(val));
-    put(int1(val));
-    put(int0(val));
-    return this;
-  }
-
-  private static byte int3(int x) {
-    return (byte) (x >> 24);
-  }
-
-  private static byte int2(int x) {
-    return (byte) (x >> 16);
-  }
-
-  private static byte int1(int x) {
-    return (byte) (x >> 8);
-  }
-
-  private static byte int0(int x) {
-    return (byte) (x);
-  }
-
-  /**
-   * Copies from the given byte[] to this MBB
-   * @param src
-   * @return this MBB
-   */
-  public final MultiByteBuffer put(byte[] src) {
-    return put(src, 0, src.length);
-  }
-
-  /**
-   * Copies from the given byte[] to this MBB
-   * @param src
-   * @param offset the position in the byte array from which the copy should be done
-   * @param length the length upto which the copy should happen
-   * @return this MBB
-   */
-  public MultiByteBuffer put(byte[] src, int offset, int length) {
-    if (singleItem || this.curItem.remaining() >= length) {
-      ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
-      return this;
-    }
-    int end = offset + length;
-    for (int i = offset; i < end; i++) {
-      this.put(src[i]);
-    }
-    return this;
-  }
-
-
-  /**
-   * Writes a long to this MBB at its current position. Also advances the position by size of long
-   * @param val Long value to write
-   * @return this object
-   */
-  public MultiByteBuffer putLong(long val) {
-    if (singleItem || this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
-      this.curItem.putLong(val);
-      return this;
-    }
-    if (this.curItemIndex == this.items.length - 1) {
-      throw new BufferOverflowException();
-    }
-    // During read, we will read as byte by byte for this case. So just write in Big endian
-    put(long7(val));
-    put(long6(val));
-    put(long5(val));
-    put(long4(val));
-    put(long3(val));
-    put(long2(val));
-    put(long1(val));
-    put(long0(val));
-    return this;
-  }
-
-  private static byte long7(long x) {
-    return (byte) (x >> 56);
-  }
-
-  private static byte long6(long x) {
-    return (byte) (x >> 48);
-  }
-
-  private static byte long5(long x) {
-    return (byte) (x >> 40);
-  }
-
-  private static byte long4(long x) {
-    return (byte) (x >> 32);
-  }
-
-  private static byte long3(long x) {
-    return (byte) (x >> 24);
-  }
-
-  private static byte long2(long x) {
-    return (byte) (x >> 16);
-  }
-
-  private static byte long1(long x) {
-    return (byte) (x >> 8);
-  }
-
-  private static byte long0(long x) {
-    return (byte) (x);
-  }
-
-  /**
-   * Jumps the current position of this MBB by specified length.
-   * @param length
-   */
-  public void skip(int length) {
-    if (this.singleItem) {
-      this.curItem.position(this.curItem.position() + length);
-      return;
-    }
-    // Get available bytes from this item and remaining from next
-    int jump = 0;
-    while (true) {
-      jump = this.curItem.remaining();
-      if (jump >= length) {
-        this.curItem.position(this.curItem.position() + length);
-        break;
-      }
-      this.curItem.position(this.curItem.position() + jump);
-      length -= jump;
-      this.curItemIndex++;
-      this.curItem = this.items[this.curItemIndex];
-    }
-  }
-
-  /**
-   * Jumps back the current position of this MBB by specified length.
-   * @param length
-   */
-  public void moveBack(int length) {
-    if (this.singleItem) {
-      this.curItem.position(curItem.position() - length);
-      return;
-    }
-    while (length != 0) {
-      if (length > curItem.position()) {
-        length -= curItem.position();
-        this.curItem.position(0);
-        this.curItemIndex--;
-        this.curItem = this.items[curItemIndex];
-      } else {
-        this.curItem.position(curItem.position() - length);
-        break;
-      }
-    }
-  }
-
- /**
-   * Returns bytes from current position till length specified, as a single ByteButter. When all
-   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
-   * as such will be returned. So users are warned not to change the position or limit of this
-   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
-   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
-   * the bytes to a newly created ByteBuffer of required size and return that.
-   *
-   * @param length number of bytes required.
-   * @return bytes from current position till length specified, as a single ByteButter.
-   */
-  public ByteBuffer asSubBuffer(int length) {
-    if (this.singleItem || this.curItem.remaining() >= length) {
-      return this.curItem;
-    }
-    int offset = 0;
-    byte[] dupB = new byte[length];
-    int locCurItemIndex = curItemIndex;
-    ByteBuffer locCurItem = curItem;
-    while (length > 0) {
-      int toRead = Math.min(length, locCurItem.remaining());
-      ByteBufferUtils
-          .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
-      length -= toRead;
-      if (length == 0)
-        break;
-      locCurItemIndex++;
-      locCurItem = this.items[locCurItemIndex];
-      offset += toRead;
-    }
-    return ByteBuffer.wrap(dupB);
-  }
-
-  /**
-   * Returns bytes from given offset till length specified, as a single ByteButter. When all these
-   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
-   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
-   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
-   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
-   * ByteBuffer of required size and return that.
-   *
-   * @param offset the offset in this MBB from where the subBuffer should be created
-   * @param length the length of the subBuffer
-   * @return a pair of bytes from current position till length specified, as a single ByteButter and
-   *         offset in that Buffer where the bytes starts.
-   */
-  public Pair<ByteBuffer, Integer> asSubBuffer(int offset, int length) {
-    if (this.singleItem) {
-      return new Pair<ByteBuffer, Integer>(this.curItem, offset);
-    }
-    if (this.itemBeginPos[this.curItemIndex] <= offset) {
-      int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
-      if (this.curItem.limit() - relOffsetInCurItem >= length) {
-        return new Pair<ByteBuffer, Integer>(this.curItem, relOffsetInCurItem);
-      }
-    }
-    int itemIndex = getItemIndex(offset);
-    ByteBuffer item = this.items[itemIndex];
-    offset = offset - this.itemBeginPos[itemIndex];
-    if (item.limit() - offset >= length) {
-      return new Pair<ByteBuffer, Integer>(item, offset);
-    }
-    byte[] dst = new byte[length];
-    int destOffset = 0;
-    while (length > 0) {
-      int toRead = Math.min(length, item.limit() - offset);
-      ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
-      length -= toRead;
-      if (length == 0) break;
-      itemIndex++;
-      item = this.items[itemIndex];
-      destOffset += toRead;
-      offset = 0;
-    }
-    return new Pair<ByteBuffer, Integer>(ByteBuffer.wrap(dst), 0);
-  }
-
-  /**
-   * Compares two MBBs
-   *
-   * @param buf1 the first MBB
-   * @param o1 the offset in the first MBB from where the compare has to happen
-   * @param len1 the length in the first MBB upto which the compare has to happen
-   * @param buf2 the second MBB
-   * @param o2 the offset in the second MBB from where the compare has to happen
-   * @param len2 the length in the second MBB upto which the compare has to happen
-   * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is
-   *         smaller than buf2.
-   */
-  public static int compareTo(MultiByteBuffer buf1, int o1, int len1, MultiByteBuffer buf2, int o2,
-      int len2) {
-    if (buf1.hasArray() && buf2.hasArray()) {
-      return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
-          buf2.arrayOffset() + o2, len2);
-    }
-    int end1 = o1 + len1;
-    int end2 = o2 + len2;
-    for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
-      int a = buf1.get(i) & 0xFF;
-      int b = buf2.get(j) & 0xFF;
-      if (a != b) {
-        return a - b;
-      }
-    }
-    return len1 - len2;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof MultiByteBuffer)) return false;
-    if (this == obj) return true;
-    MultiByteBuffer that = (MultiByteBuffer) obj;
-    if (this.capacity() != that.capacity()) return false;
-    if (compareTo(this, 0, this.capacity(), that, 0, this.capacity()) == 0) return true;
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    int hash = 0;
-    for (ByteBuffer b : this.items) {
-      hash += b.hashCode();
-    }
-    return hash;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
new file mode 100644
index 0000000..62601f3
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An implementation of ByteBuff where a single BB backs the BBI. This just acts
+ * as a wrapper over a normal BB - offheap or onheap
+ */
+@InterfaceAudience.Private
+public class SingleByteBuff extends ByteBuff {
+
+  // Underlying BB
+  private final ByteBuffer buf;
+
+  public SingleByteBuff(ByteBuffer buf) {
+    this.buf = buf;
+  }
+
+  @Override
+  public int position() {
+    return this.buf.position();
+  }
+
+  @Override
+  public SingleByteBuff position(int position) {
+    this.buf.position(position);
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff skip(int len) {
+    this.buf.position(this.buf.position() + len);
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff moveBack(int len) {
+    this.buf.position(this.buf.position() - len);
+    return this;
+  }
+
+  @Override
+  public int capacity() {
+    return this.buf.capacity();
+  }
+
+  @Override
+  public int limit() {
+    return this.buf.limit();
+  }
+
+  @Override
+  public SingleByteBuff limit(int limit) {
+    this.buf.limit(limit);
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff rewind() {
+    this.buf.rewind();
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff mark() {
+    this.buf.mark();
+    return this;
+  }
+
+  @Override
+  public ByteBuffer asSubByteBuffer(int length) {
+    // Just return the single BB that is available
+    return this.buf;
+  }
+
+  @Override
+  public void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair) {
+    // Just return the single BB that is available
+    pair.setFirst(this.buf);
+    pair.setSecond(offset);
+  }
+
+  @Override
+  public int remaining() {
+    return this.buf.remaining();
+  }
+
+  @Override
+  public boolean hasRemaining() {
+    return buf.hasRemaining();
+  }
+
+  @Override
+  public SingleByteBuff reset() {
+    this.buf.reset();
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff slice() {
+    return new SingleByteBuff(this.buf.slice());
+  }
+
+  @Override
+  public SingleByteBuff duplicate() {
+    return new SingleByteBuff(this.buf.duplicate());
+  }
+
+  @Override
+  public byte get() {
+    return buf.get();
+  }
+
+  @Override
+  public byte get(int index) {
+    return ByteBufferUtils.toByte(this.buf, index);
+  }
+
+  @Override
+  public byte getByteStrictlyForward(int index) {
+    if (index < this.buf.position()) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    return ByteBufferUtils.toByte(this.buf, index);
+  }
+
+  @Override
+  public SingleByteBuff put(byte b) {
+    this.buf.put(b);
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff put(int index, byte b) {
+    buf.put(index, b);
+    return this;
+  }
+
+  @Override
+  public void get(byte[] dst, int offset, int length) {
+    ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length);
+    buf.position(buf.position() + length);
+  }
+
+  @Override
+  public void get(byte[] dst) {
+    get(dst, 0, dst.length);
+  }
+
+  @Override
+  public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+    if (src instanceof SingleByteBuff) {
+      ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset,
+          offset, length);
+    } else {
+      // TODO we can do some optimization here? Call to asSubByteBuffer might
+      // create a copy.
+      Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
+      src.asSubByteBuffer(srcOffset, length, pair);
+      ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset,
+          length);
+    }
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff put(byte[] src, int offset, int length) {
+    ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length);
+    return this;
+  }
+
+  @Override
+  public SingleByteBuff put(byte[] src) {
+    return put(src, 0, src.length);
+  }
+
+  @Override
+  public boolean hasArray() {
+    return this.buf.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return this.buf.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return this.buf.arrayOffset();
+  }
+
+  @Override
+  public short getShort() {
+    return this.buf.getShort();
+  }
+
+  @Override
+  public short getShort(int index) {
+    return ByteBufferUtils.toShort(this.buf, index);
+  }
+
+  @Override
+  public short getShortStrictlyForward(int index) {
+    if (index < this.buf.position()) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    return ByteBufferUtils.toShort(this.buf, index);
+  }
+
+  @Override
+  public int getInt() {
+    return this.buf.getInt();
+  }
+
+  @Override
+  public SingleByteBuff putInt(int value) {
+    ByteBufferUtils.putInt(this.buf, value);
+    return this;
+  }
+
+  @Override
+  public int getInt(int index) {
+    return ByteBufferUtils.toInt(this.buf, index);
+  }
+
+  @Override
+  public int getIntStrictlyForward(int index) {
+    if (index < this.buf.position()) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    return ByteBufferUtils.toInt(this.buf, index);
+  }
+
+  @Override
+  public long getLong() {
+    return this.buf.getLong();
+  }
+
+  @Override
+  public SingleByteBuff putLong(long value) {
+    ByteBufferUtils.putLong(this.buf, value);
+    return this;
+  }
+
+  @Override
+  public long getLong(int index) {
+    return ByteBufferUtils.toLong(this.buf, index);
+  }
+
+  @Override
+  public long getLongStrictlyForward(int index) {
+    if (index < this.buf.position()) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    return ByteBufferUtils.toLong(this.buf, index);
+  }
+
+  @Override
+  public byte[] toBytes(int offset, int length) {
+    byte[] output = new byte[length];
+    ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length);
+    return output;
+  }
+
+  @Override
+  public void get(ByteBuffer out, int sourceOffset, int length) {
+    ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(!(obj instanceof SingleByteBuff)) return false;
+    return this.buf.equals(((SingleByteBuff)obj).buf);
+  }
+
+  @Override
+  public int hashCode() {
+    return this.buf.hashCode();
+  }
+
+  /**
+   * @return the ByteBuffer which this wraps.
+   */
+  ByteBuffer getEnclosingByteBuffer() {
+    return this.buf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index d3414dd..986d6e0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -25,6 +25,9 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -200,4 +203,65 @@ public final class ByteBufferArray {
     }
     assert srcIndex == len;
   }
+
+  /**
+   * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
+   * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
+   * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
+   * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
+   * 'position' 0 to 'length' 5.
+   * @param offset
+   * @param len
+   * @return a ByteBuff formed from the underlying ByteBuffers
+   */
+  public ByteBuff asSubByteBuff(long offset, int len) {
+    assert len >= 0;
+    long end = offset + len;
+    int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
+    int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
+    assert startBuffer >= 0 && startBuffer < bufferCount;
+    assert endBuffer >= 0 && endBuffer < bufferCount
+        || (endBuffer == bufferCount && endBufferOffset == 0);
+    if (startBuffer >= locks.length || startBuffer < 0) {
+      String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
+          + ",bufferSize=" + bufferSize;
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+    int srcIndex = 0, cnt = -1;
+    ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
+    for (int i = startBuffer,j=0; i <= endBuffer; ++i,j++) {
+      Lock lock = locks[i];
+      lock.lock();
+      try {
+        ByteBuffer bb = buffers[i];
+        if (i == startBuffer) {
+          cnt = bufferSize - startBufferOffset;
+          if (cnt > len) cnt = len;
+          ByteBuffer dup = bb.duplicate();
+          dup.limit(startBufferOffset + cnt).position(startBufferOffset);
+          mbb[j] = dup.slice();
+        } else if (i == endBuffer) {
+          cnt = endBufferOffset;
+          ByteBuffer dup = bb.duplicate();
+          dup.position(0).limit(cnt);
+          mbb[j] = dup.slice();
+        } else {
+          cnt = bufferSize ;
+          ByteBuffer dup = bb.duplicate();
+          dup.position(0).limit(cnt);
+          mbb[j] = dup.slice();
+        }
+        srcIndex += cnt;
+      } finally {
+        lock.unlock();
+      }
+    }
+    assert srcIndex == len;
+    if (mbb.length > 1) {
+      return new MultiByteBuff(mbb);
+    } else {
+      return new SingleByteBuff(mbb[0]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 7bb72a0..1fb5991 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -41,9 +41,9 @@ import sun.nio.ch.DirectBuffer;
 public final class ByteBufferUtils {
 
   // "Compressed integer" serialization helper constants.
-  private final static int VALUE_MASK = 0x7f;
-  private final static int NEXT_BIT_SHIFT = 7;
-  private final static int NEXT_BIT_MASK = 1 << 7;
+  public final static int VALUE_MASK = 0x7f;
+  public final static int NEXT_BIT_SHIFT = 7;
+  public final static int NEXT_BIT_MASK = 1 << 7;
 
   private ByteBufferUtils() {
   }
@@ -139,6 +139,14 @@ public final class ByteBufferUtils {
      }
    }
 
+  public static byte toByte(ByteBuffer buffer, int offset) {
+    if (UnsafeAccess.isAvailable()) {
+      return UnsafeAccess.toByte(buffer, offset);
+    } else {
+      return buffer.get(offset);
+    }
+  }
+
   /**
    * Copy the data to the output stream and update position in buffer.
    * @param out the stream to write bytes to
@@ -182,6 +190,15 @@ public final class ByteBufferUtils {
     return fitInBytes;
   }
 
+  public static int putByte(ByteBuffer buffer, int offset, byte b) {
+    if (UnsafeAccess.isAvailable()) {
+      return UnsafeAccess.putByte(buffer, offset, b);
+    } else {
+      buffer.put(offset, b);
+      return offset + 1;
+    }
+  }
+
   /**
    * Check how many bytes are required to store value.
    * @param value Value which size will be tested.
@@ -334,30 +351,6 @@ public final class ByteBufferUtils {
   }
 
   /**
-   * Copy from one buffer to another from given offset.
-   * <p>
-   * Note : This will advance the position marker of {@code out} but not change the position maker
-   * for {@code in}
-   * @param out destination buffer
-   * @param in source buffer
-   * @param sourceOffset offset in the source buffer
-   * @param length how many bytes to copy
-   */
-  public static void copyFromBufferToBuffer(ByteBuffer out,
-      ByteBuffer in, int sourceOffset, int length) {
-    if (in.hasArray() && out.hasArray()) {
-      System.arraycopy(in.array(), sourceOffset + in.arrayOffset(),
-          out.array(), out.position() +
-          out.arrayOffset(), length);
-      skip(out, length);
-    } else {
-      for (int i = 0; i < length; ++i) {
-        out.put(in.get(sourceOffset + i));
-      }
-    }
-  }
-
-  /**
    * Copy one buffer's whole data to another. Write starts at the current position of 'out' buffer.
    * Note : This will advance the position marker of {@code out} but not change the position maker
    * for {@code in}. The position and limit of the {@code in} buffer to be set properly by caller.
@@ -377,22 +370,51 @@ public final class ByteBufferUtils {
   /**
    * Copy from one buffer to another from given offset. This will be absolute positional copying and
    * won't affect the position of any of the buffers.
-   * @param out
    * @param in
+   * @param out
    * @param sourceOffset
    * @param destinationOffset
    * @param length
    */
-  public static void copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset,
+  public static int copyFromBufferToBuffer(ByteBuffer in, ByteBuffer out, int sourceOffset,
       int destinationOffset, int length) {
     if (in.hasArray() && out.hasArray()) {
       System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset()
           + destinationOffset, length);
+    } else if (UnsafeAccess.isAvailable()) {
+      UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
+    } else {
+      for (int i = 0; i < length; ++i) {
+        putByte(out, destinationOffset + i, toByte(in, sourceOffset + i));
+      }
+    }
+    return destinationOffset + length;
+  }
+
+  /**
+   * Copy from one buffer to another from given offset.
+   * <p>
+   * Note : This will advance the position marker of {@code out} but not change the position maker
+   * for {@code in}
+   * @param in source buffer
+   * @param out destination buffer
+   * @param sourceOffset offset in the source buffer
+   * @param length how many bytes to copy
+   */
+  public static void copyFromBufferToBuffer(ByteBuffer in,
+      ByteBuffer out, int sourceOffset, int length) {
+    if (in.hasArray() && out.hasArray()) {
+      System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.position()
+          + out.arrayOffset(), length);
+    } else if (UnsafeAccess.isAvailable()) {
+      UnsafeAccess.copy(in, sourceOffset, out, out.position(), length);
     } else {
+      int destOffset = out.position();
       for (int i = 0; i < length; ++i) {
-        out.put((destinationOffset + i), in.get(sourceOffset + i));
+        putByte(out, destOffset + i, toByte(in, sourceOffset + i));
       }
     }
+    skip(out, length);
   }
 
   /**
@@ -736,6 +758,35 @@ public final class ByteBufferUtils {
   }
 
   /**
+   * Put a short value out to the given ByteBuffer's current position in big-endian format.
+   * This also advances the position in buffer by short size.
+   * @param buffer the ByteBuffer to write to
+   * @param val short to write out
+   */
+  public static void putShort(ByteBuffer buffer, short val) {
+    if (UnsafeAccess.isAvailable()) {
+      int newPos = UnsafeAccess.putShort(buffer, buffer.position(), val);
+      buffer.position(newPos);
+    } else {
+      buffer.putShort(val);
+    }
+  }
+
+  /**
+   * Put a long value out to the given ByteBuffer's current position in big-endian format.
+   * This also advances the position in buffer by long size.
+   * @param buffer the ByteBuffer to write to
+   * @param val long to write out
+   */
+  public static void putLong(ByteBuffer buffer, long val) {
+    if (UnsafeAccess.isAvailable()) {
+      int newPos = UnsafeAccess.putLong(buffer, buffer.position(), val);
+      buffer.position(newPos);
+    } else {
+      buffer.putLong(val);
+    }
+  }
+  /**
    * Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes
    * to buffer's current position. This also advances the position in the 'out' buffer by 'length'
    * @param out
@@ -758,15 +809,16 @@ public final class ByteBufferUtils {
   }
 
   /**
-   * Copies specified number of bytes from given offset of 'in' ByteBuffer to the array.
+   * Copies specified number of bytes from given offset of 'in' ByteBuffer to
+   * the array.
    * @param out
    * @param in
    * @param sourceOffset
    * @param destinationOffset
    * @param length
    */
-  public static void copyFromBufferToArray(byte[] out, ByteBuffer in,
-      int sourceOffset, int destinationOffset, int length) {
+  public static void copyFromBufferToArray(byte[] out, ByteBuffer in, int sourceOffset,
+      int destinationOffset, int length) {
     if (in.hasArray()) {
       System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length);
     } else if (UnsafeAccess.isAvailable()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java
index 34d9f90..aa0795d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java
@@ -140,4 +140,6 @@ public abstract class Hash {
    * @return hash value
    */
   public abstract int hash(byte[] bytes, int offset, int length, int initval);
+
+  // TODO : a buffer based hash function would be needed.. Not adding it for now
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index ea13dbc..0cccee6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -343,4 +343,72 @@ public final class UnsafeAccess {
     }
     theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, length);
   }
+  // APIs to add primitives to BBs
+  /**
+   * Put a short value out to the specified BB position in big-endian format.
+   * @param buf the byte buffer
+   * @param offset position in the buffer
+   * @param val short to write out
+   * @return incremented offset
+   */
+  public static int putShort(ByteBuffer buf, int offset, short val) {
+    if (littleEndian) {
+      val = Short.reverseBytes(val);
+    }
+    if (buf.isDirect()) {
+      theUnsafe.putShort(((DirectBuffer) buf).address() + offset, val);
+    } else {
+      theUnsafe.putShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
+    }
+    return offset + Bytes.SIZEOF_SHORT;
+  }
+
+  /**
+   * Put a long value out to the specified BB position in big-endian format.
+   * @param buf the byte buffer
+   * @param offset position in the buffer
+   * @param val long to write out
+   * @return incremented offset
+   */
+  public static int putLong(ByteBuffer buf, int offset, long val) {
+    if (littleEndian) {
+      val = Long.reverseBytes(val);
+    }
+    if (buf.isDirect()) {
+      theUnsafe.putLong(((DirectBuffer) buf).address() + offset, val);
+    } else {
+      theUnsafe.putLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
+    }
+    return offset + Bytes.SIZEOF_LONG;
+  }
+  /**
+   * Put a byte value out to the specified BB position in big-endian format.
+   * @param buf the byte buffer
+   * @param offset position in the buffer
+   * @param b byte to write out
+   * @return incremented offset
+   */
+  public static int putByte(ByteBuffer buf, int offset, byte b) {
+    if (buf.isDirect()) {
+      theUnsafe.putByte(((DirectBuffer) buf).address() + offset, b);
+    } else {
+      theUnsafe.putByte(buf.array(),
+          BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, b);
+    }
+    return offset + 1;
+  }
+
+  /**
+   * Returns the byte at the given offset
+   * @param buf the buffer to read
+   * @param offset the offset at which the byte has to be read
+   * @return the byte at the given offset
+   */
+  public static byte toByte(ByteBuffer buf, int offset) {
+    if (buf.isDirect()) {
+      return theUnsafe.getByte(((DirectBuffer) buf).address() + offset);
+    } else {
+      return theUnsafe.getByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java
deleted file mode 100644
index 30fb71e..0000000
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ IOTests.class, SmallTests.class })
-public class TestByteBufferInputStream {
-
-  @Test
-  public void testReads() throws Exception {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream(100);
-    DataOutputStream dos = new DataOutputStream(bos);
-    String s = "test";
-    int i = 128;
-    dos.write(1);
-    dos.writeInt(i);
-    dos.writeBytes(s);
-    dos.writeLong(12345L);
-    dos.writeShort(2);
-    dos.flush();
-    ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
-
-    // bbis contains 19 bytes
-    // 1 byte, 4 bytes int, 4 bytes string, 8 bytes long and 2 bytes short
-    ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
-    assertEquals(15 + s.length(), bbis.available());
-    assertEquals(1, bbis.read());
-    byte[] ib = new byte[4];
-    bbis.read(ib);
-    assertEquals(i, Bytes.toInt(ib));
-    byte[] sb = new byte[s.length()];
-    bbis.read(sb);
-    assertEquals(s, Bytes.toString(sb));
-    byte[] lb = new byte[8];
-    bbis.read(lb);
-    assertEquals(12345, Bytes.toLong(lb));
-    assertEquals(2, bbis.available());
-    ib = new byte[4];
-    int read = bbis.read(ib, 0, ib.length);
-    // We dont have 4 bytes remainig but only 2. So onlt those should be returned back
-    assertEquals(2, read);
-    assertEquals(2, Bytes.toShort(ib));
-    assertEquals(0, bbis.available());
-    // At end. The read() should return -1
-    assertEquals(-1, bbis.read());
-    bbis.close();
-
-    bb = ByteBuffer.wrap(bos.toByteArray());
-    bbis = new ByteBufferInputStream(bb);
-    DataInputStream dis = new DataInputStream(bbis);
-    dis.read();
-    assertEquals(i, dis.readInt());
-    dis.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java
new file mode 100644
index 0000000..ed96e87
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestMultiByteBuffInputStream {
+
+  @Test
+  public void testReads() throws Exception {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(100);
+    DataOutputStream dos = new DataOutputStream(bos);
+    String s = "test";
+    int i = 128;
+    dos.write(1);
+    dos.writeInt(i);
+    dos.writeBytes(s);
+    dos.writeLong(12345L);
+    dos.writeShort(2);
+    dos.flush();
+    ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
+
+    // bbis contains 19 bytes
+    // 1 byte, 4 bytes int, 4 bytes string, 8 bytes long and 2 bytes short
+    ByteBuffInputStream bbis = new ByteBuffInputStream(new MultiByteBuff(bb));
+    assertEquals(15 + s.length(), bbis.available());
+    assertEquals(1, bbis.read());
+    byte[] ib = new byte[4];
+    bbis.read(ib);
+    assertEquals(i, Bytes.toInt(ib));
+    byte[] sb = new byte[s.length()];
+    bbis.read(sb);
+    assertEquals(s, Bytes.toString(sb));
+    byte[] lb = new byte[8];
+    bbis.read(lb);
+    assertEquals(12345, Bytes.toLong(lb));
+    assertEquals(2, bbis.available());
+    ib = new byte[4];
+    int read = bbis.read(ib, 0, ib.length);
+    // We dont have 4 bytes remainig but only 2. So onlt those should be returned back
+    assertEquals(2, read);
+    assertEquals(2, Bytes.toShort(ib));
+    assertEquals(0, bbis.available());
+    // At end. The read() should return -1
+    assertEquals(-1, bbis.read());
+    bbis.close();
+
+    bb = ByteBuffer.wrap(bos.toByteArray());
+    bbis = new ByteBuffInputStream(new MultiByteBuff(bb));
+    DataInputStream dis = new DataInputStream(bbis);
+    dis.read();
+    assertEquals(i, dis.readInt());
+    dis.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
new file mode 100644
index 0000000..588e946
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
@@ -0,0 +1,324 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestMultiByteBuff {
+
+  @Test
+  public void testWritesAndReads() {
+    // Absolute reads
+    ByteBuffer bb1 = ByteBuffer.allocate(15);
+    ByteBuffer bb2 = ByteBuffer.allocate(15);
+    int i1 = 4;
+    bb1.putInt(i1);
+    long l1 = 45L, l2 = 100L, l3 = 12345L;
+    bb1.putLong(l1);
+    short s1 = 2;
+    bb1.putShort(s1);
+    byte[] b = Bytes.toBytes(l2);
+    bb1.put(b, 0, 1);
+    bb2.put(b, 1, 7);
+    bb2.putLong(l3);
+    MultiByteBuff mbb = new MultiByteBuff(bb1, bb2);
+    assertEquals(l1, mbb.getLong(4));
+    assertEquals(l2, mbb.getLong(14));
+    assertEquals(l3, mbb.getLong(22));
+    assertEquals(i1, mbb.getInt(0));
+    assertEquals(s1, mbb.getShort(12));
+    // Relative reads
+    assertEquals(i1, mbb.getInt());
+    assertEquals(l1, mbb.getLong());
+    assertEquals(s1, mbb.getShort());
+    assertEquals(l2, mbb.getLong());
+    assertEquals(l3, mbb.getLong());
+    // Absolute writes
+    bb1 = ByteBuffer.allocate(15);
+    bb2 = ByteBuffer.allocate(15);
+    mbb = new MultiByteBuff(bb1, bb2);
+    byte b1 = 5, b2 = 31;
+    mbb.put(b1);
+    mbb.putLong(l1);
+    mbb.putInt(i1);
+    mbb.putLong(l2);
+    mbb.put(b2);
+    mbb.position(mbb.position() + 2);
+    try {
+      mbb.putLong(l3);
+      fail("'Should have thrown BufferOverflowException");
+    } catch (BufferOverflowException e) {
+    }
+    mbb.position(mbb.position() - 2);
+    mbb.putLong(l3);
+    mbb.rewind();
+    assertEquals(b1, mbb.get());
+    assertEquals(l1, mbb.getLong());
+    assertEquals(i1, mbb.getInt());
+    assertEquals(l2, mbb.getLong());
+    assertEquals(b2, mbb.get());
+    assertEquals(l3, mbb.getLong());
+    mbb.put(21, b1);
+    mbb.position(21);
+    assertEquals(b1, mbb.get());
+    mbb.put(b);
+    assertEquals(l2, mbb.getLong(22));
+    try {
+      // This should fail because we have already move to a position
+      // greater than 22
+      mbb.getLongStrictlyForward(22);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+    }
+  }
+
+  @Test
+  public void testPutPrimitives() {
+    ByteBuffer bb = ByteBuffer.allocate(10);
+    SingleByteBuff s = new SingleByteBuff(bb);
+    s.putLong(-4465109508325701663l);
+    bb.rewind();
+    long long1 = bb.getLong();
+    assertEquals(long1, -4465109508325701663l);
+    s.position(8);
+  }
+
+  @Test
+  public void testArrayBasedMethods() {
+    byte[] b = new byte[15];
+    ByteBuffer bb1 = ByteBuffer.wrap(b, 1, 10).slice();
+    ByteBuffer bb2 = ByteBuffer.allocate(15);
+    ByteBuff mbb1 = new MultiByteBuff(bb1, bb2);
+    assertFalse(mbb1.hasArray());
+    try {
+      mbb1.array();
+      fail();
+    } catch (UnsupportedOperationException e) {
+    }
+    try {
+      mbb1.arrayOffset();
+      fail();
+    } catch (UnsupportedOperationException e) {
+    }
+    mbb1 = new SingleByteBuff(bb1);
+    assertTrue(mbb1.hasArray());
+    assertEquals(1, mbb1.arrayOffset());
+    assertEquals(b, mbb1.array());
+    mbb1 = new SingleByteBuff(ByteBuffer.allocateDirect(10));
+    assertFalse(mbb1.hasArray());
+    try {
+      mbb1.array();
+      fail();
+    } catch (UnsupportedOperationException e) {
+    }
+    try {
+      mbb1.arrayOffset();
+      fail();
+    } catch (UnsupportedOperationException e) {
+    }
+  }
+
+  @Test
+  public void testMarkAndResetWithMBB() {
+    ByteBuffer bb1 = ByteBuffer.allocateDirect(15);
+    ByteBuffer bb2 = ByteBuffer.allocateDirect(15);
+    bb1.putInt(4);
+    long l1 = 45L, l2 = 100L, l3 = 12345L;
+    bb1.putLong(l1);
+    bb1.putShort((short) 2);
+    byte[] b = Bytes.toBytes(l2);
+    bb1.put(b, 0, 1);
+    bb2.put(b, 1, 7);
+    bb2.putLong(l3);
+    ByteBuff multi = new MultiByteBuff(bb1, bb2);
+    assertEquals(4, multi.getInt());
+    assertEquals(l1, multi.getLong());
+    multi.mark();
+    assertEquals((short) 2, multi.getShort());
+    multi.reset();
+    assertEquals((short) 2, multi.getShort());
+    multi.mark();
+    assertEquals(l2, multi.getLong());
+    multi.reset();
+    assertEquals(l2, multi.getLong());
+    multi.mark();
+    assertEquals(l3, multi.getLong());
+    multi.reset();
+    assertEquals(l3, multi.getLong());
+    // Try absolute gets with mark and reset
+    multi.mark();
+    assertEquals(l2, multi.getLong(14));
+    multi.reset();
+    assertEquals(l3, multi.getLong(22));
+    // Just reset to see what happens
+    multi.reset();
+    assertEquals(l2, multi.getLong(14));
+    multi.mark();
+    assertEquals(l3, multi.getLong(22));
+    multi.reset();
+  }
+
+  @Test
+  public void testSkipNBytes() {
+    ByteBuffer bb1 = ByteBuffer.allocate(15);
+    ByteBuffer bb2 = ByteBuffer.allocate(15);
+    bb1.putInt(4);
+    long l1 = 45L, l2 = 100L, l3 = 12345L;
+    bb1.putLong(l1);
+    bb1.putShort((short) 2);
+    byte[] b = Bytes.toBytes(l2);
+    bb1.put(b, 0, 1);
+    bb2.put(b, 1, 7);
+    bb2.putLong(l3);
+    MultiByteBuff multi = new MultiByteBuff(bb1, bb2);
+    assertEquals(4, multi.getInt());
+    assertEquals(l1, multi.getLong());
+    multi.skip(10);
+    assertEquals(l3, multi.getLong());
+  }
+
+  @Test
+  public void testMoveBack() {
+    ByteBuffer bb1 = ByteBuffer.allocate(15);
+    ByteBuffer bb2 = ByteBuffer.allocate(15);
+    bb1.putInt(4);
+    long l1 = 45L, l2 = 100L, l3 = 12345L;
+    bb1.putLong(l1);
+    bb1.putShort((short) 2);
+    byte[] b = Bytes.toBytes(l2);
+    bb1.put(b, 0, 1);
+    bb2.put(b, 1, 7);
+    bb2.putLong(l3);
+    MultiByteBuff multi = new MultiByteBuff(bb1, bb2);
+    assertEquals(4, multi.getInt());
+    assertEquals(l1, multi.getLong());
+    multi.skip(10);
+    multi.moveBack(4);
+    multi.moveBack(6);
+    multi.moveBack(8);
+    assertEquals(l1, multi.getLong());
+  }
+
+  @Test
+  public void testSubBuffer() {
+    ByteBuffer bb1 = ByteBuffer.allocateDirect(10);
+    ByteBuffer bb2 = ByteBuffer.allocateDirect(10);
+    MultiByteBuff multi = new MultiByteBuff(bb1, bb2);
+    long l1 = 1234L, l2 = 100L;
+    multi.putLong(l1);
+    multi.putLong(l2);
+    multi.rewind();
+    ByteBuffer sub = multi.asSubByteBuffer(Bytes.SIZEOF_LONG);
+    assertTrue(bb1 == sub);
+    assertEquals(l1, ByteBufferUtils.toLong(sub, sub.position()));
+    multi.skip(Bytes.SIZEOF_LONG);
+    sub = multi.asSubByteBuffer(Bytes.SIZEOF_LONG);
+    assertFalse(bb1 == sub);
+    assertFalse(bb2 == sub);
+    assertEquals(l2, ByteBufferUtils.toLong(sub, sub.position()));
+    multi.rewind();
+    Pair<ByteBuffer, Integer> p = new Pair<ByteBuffer, Integer>();
+    multi.asSubByteBuffer(8, Bytes.SIZEOF_LONG, p);
+    assertFalse(bb1 == p.getFirst());
+    assertFalse(bb2 == p.getFirst());
+    assertEquals(0, p.getSecond().intValue());
+    assertEquals(l2, ByteBufferUtils.toLong(sub, p.getSecond()));
+  }
+
+  @Test
+  public void testSliceDuplicateMethods() throws Exception {
+    ByteBuffer bb1 = ByteBuffer.allocateDirect(10);
+    ByteBuffer bb2 = ByteBuffer.allocateDirect(15);
+    MultiByteBuff multi = new MultiByteBuff(bb1, bb2);
+    long l1 = 1234L, l2 = 100L;
+    multi.put((byte) 2);
+    multi.putLong(l1);
+    multi.putLong(l2);
+    multi.putInt(45);
+    multi.position(1);
+    multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG));
+    MultiByteBuff sliced = multi.slice();
+    assertEquals(0, sliced.position());
+    assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit());
+    assertEquals(l1, sliced.getLong());
+    assertEquals(l2, sliced.getLong());
+    MultiByteBuff dup = multi.duplicate();
+    assertEquals(1, dup.position());
+    assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit());
+    assertEquals(l1, dup.getLong());
+    assertEquals(l2, dup.getLong());
+  }
+
+  @Test
+  public void testGetWithPosOnMultiBuffers() throws IOException {
+    byte[] b = new byte[4];
+    byte[] b1 = new byte[4];
+    ByteBuffer bb1 = ByteBuffer.wrap(b);
+    ByteBuffer bb2 = ByteBuffer.wrap(b1);
+    MultiByteBuff mbb1 = new MultiByteBuff(bb1, bb2);
+    mbb1.position(2);
+    mbb1.putInt(4);
+    int res = mbb1.getInt(2);
+    byte[] bres = new byte[4];
+    bres[0] = mbb1.get(2);
+    bres[1] = mbb1.get(3);
+    bres[2] = mbb1.get(4);
+    bres[3] = mbb1.get(5);
+    int expected = Bytes.toInt(bres);
+    assertEquals(res, expected);
+  }
+
+  @Test
+  public void testGetIntStrictlyForwardWithPosOnMultiBuffers() throws IOException {
+    byte[] b = new byte[4];
+    byte[] b1 = new byte[8];
+    ByteBuffer bb1 = ByteBuffer.wrap(b);
+    ByteBuffer bb2 = ByteBuffer.wrap(b1);
+    MultiByteBuff mbb1 = new MultiByteBuff(bb1, bb2);
+    mbb1.position(2);
+    mbb1.putInt(4);
+    mbb1.position(7);
+    mbb1.put((byte) 2);
+    mbb1.putInt(3);
+    mbb1.position(0);
+    mbb1.getIntStrictlyForward(4);
+    byte res = mbb1.get(7);
+    assertEquals((byte) 2, res);
+    mbb1.position(7);
+    int intRes = mbb1.getIntStrictlyForward(8);
+    assertEquals(3, intRes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java
deleted file mode 100644
index ddab391..0000000
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.nio;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.WritableUtils;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ MiscTests.class, SmallTests.class })
-public class TestMultiByteBuffer {
-
-  @Test
-  public void testWritesAndReads() {
-    // Absolute reads
-    ByteBuffer bb1 = ByteBuffer.allocate(15);
-    ByteBuffer bb2 = ByteBuffer.allocate(15);
-    int i1 = 4;
-    bb1.putInt(i1);
-    long l1 = 45L, l2 = 100L, l3 = 12345L;
-    bb1.putLong(l1);
-    short s1 = 2;
-    bb1.putShort(s1);
-    byte[] b = Bytes.toBytes(l2);
-    bb1.put(b, 0, 1);
-    bb2.put(b, 1, 7);
-    bb2.putLong(l3);
-    MultiByteBuffer mbb = new MultiByteBuffer(bb1, bb2);
-    assertEquals(l1, mbb.getLong(4));
-    assertEquals(l2, mbb.getLong(14));
-    assertEquals(l3, mbb.getLong(22));
-    assertEquals(i1, mbb.getInt(0));
-    assertEquals(s1, mbb.getShort(12));
-    // Relative reads
-    assertEquals(i1, mbb.getInt());
-    assertEquals(l1, mbb.getLong());
-    assertEquals(s1, mbb.getShort());
-    assertEquals(l2, mbb.getLong());
-    assertEquals(l3, mbb.getLong());
-    // Absolute writes
-    bb1 = ByteBuffer.allocate(15);
-    bb2 = ByteBuffer.allocate(15);
-    mbb = new MultiByteBuffer(bb1, bb2);
-    byte b1 = 5, b2 = 31;
-    mbb.put(b1);
-    mbb.putLong(l1);
-    mbb.putInt(i1);
-    mbb.putLong(l2);
-    mbb.put(b2);
-    mbb.position(mbb.position() + 2);
-    try {
-      mbb.putLong(l3);
-      fail("'Should have thrown BufferOverflowException");
-    } catch (BufferOverflowException e) {
-    }
-    mbb.position(mbb.position() - 2);
-    mbb.putLong(l3);
-    mbb.rewind();
-    assertEquals(b1, mbb.get());
-    assertEquals(l1, mbb.getLong());
-    assertEquals(i1, mbb.getInt());
-    assertEquals(l2, mbb.getLong());
-    assertEquals(b2, mbb.get());
-    assertEquals(l3, mbb.getLong());
-    mbb.put(21, b1);
-    mbb.position(21);
-    assertEquals(b1, mbb.get());
-    mbb.put(b);
-    assertEquals(l2, mbb.getLong(22));
-  }
-
-  @Test
-  public void testGetVlong() throws IOException {
-    long vlong = 453478;
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(10);
-    DataOutput out = new DataOutputStream(baos);
-    WritableUtils.writeVLong(out, vlong);
-    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
-    MultiByteBuffer mbb = new MultiByteBuffer(bb);
-    assertEquals(vlong, mbb.getVLong());
-  }
-
-  @Test
-  public void testArrayBasedMethods() {
-    byte[] b = new byte[15];
-    ByteBuffer bb1 = ByteBuffer.wrap(b, 1, 10).slice();
-    ByteBuffer bb2 = ByteBuffer.allocate(15);
-    MultiByteBuffer mbb1 = new MultiByteBuffer(bb1, bb2);
-    assertFalse(mbb1.hasArray());
-    try {
-      mbb1.array();
-      fail();
-    } catch (UnsupportedOperationException e) {
-    }
-    try {
-      mbb1.arrayOffset();
-      fail();
-    } catch (UnsupportedOperationException e) {
-    }
-    mbb1 = new MultiByteBuffer(bb1);
-    assertTrue(mbb1.hasArray());
-    assertEquals(1, mbb1.arrayOffset());
-    assertEquals(b, mbb1.array());
-    mbb1 = new MultiByteBuffer(ByteBuffer.allocateDirect(10));
-    assertFalse(mbb1.hasArray());
-    try {
-      mbb1.array();
-      fail();
-    } catch (UnsupportedOperationException e) {
-    }
-    try {
-      mbb1.arrayOffset();
-      fail();
-    } catch (UnsupportedOperationException e) {
-    }
-  }
-
-  @Test
-  public void testMarkAndReset() {
-    ByteBuffer bb1 = ByteBuffer.allocateDirect(15);
-    ByteBuffer bb2 = ByteBuffer.allocateDirect(15);
-    bb1.putInt(4);
-    long l1 = 45L, l2 = 100L, l3 = 12345L;
-    bb1.putLong(l1);
-    bb1.putShort((short) 2);
-    byte[] b = Bytes.toBytes(l2);
-    bb1.put(b, 0, 1);
-    bb2.put(b, 1, 7);
-    bb2.putLong(l3);
-    MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2);
-    assertEquals(4, multi.getInt());
-    assertEquals(l1, multi.getLong());
-    multi.mark();
-    assertEquals((short) 2, multi.getShort());
-    multi.reset();
-    assertEquals((short) 2, multi.getShort());
-    multi.mark();
-    assertEquals(l2, multi.getLong());
-    multi.reset();
-    assertEquals(l2, multi.getLong());
-    multi.mark();
-    assertEquals(l3, multi.getLong());
-    multi.reset();
-    assertEquals(l3, multi.getLong());
-    // Try absolute gets with mark and reset
-    multi.mark();
-    assertEquals(l2, multi.getLong(14));
-    multi.reset();
-    assertEquals(l3, multi.getLong(22));
-    // Just reset to see what happens
-    multi.reset();
-    assertEquals(l2, multi.getLong(14));
-    multi.mark();
-    assertEquals(l3, multi.getLong(22));
-    multi.reset();
-  }
-
-  @Test
-  public void testSkipNBytes() {
-    ByteBuffer bb1 = ByteBuffer.allocate(15);
-    ByteBuffer bb2 = ByteBuffer.allocate(15);
-    bb1.putInt(4);
-    long l1 = 45L, l2 = 100L, l3 = 12345L;
-    bb1.putLong(l1);
-    bb1.putShort((short) 2);
-    byte[] b = Bytes.toBytes(l2);
-    bb1.put(b, 0, 1);
-    bb2.put(b, 1, 7);
-    bb2.putLong(l3);
-    MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2);
-    assertEquals(4, multi.getInt());
-    assertEquals(l1, multi.getLong());
-    multi.skip(10);
-    assertEquals(l3, multi.getLong());
-  }
-
-  @Test
-  public void testMoveBack() {
-    ByteBuffer bb1 = ByteBuffer.allocate(15);
-    ByteBuffer bb2 = ByteBuffer.allocate(15);
-    bb1.putInt(4);
-    long l1 = 45L, l2 = 100L, l3 = 12345L;
-    bb1.putLong(l1);
-    bb1.putShort((short) 2);
-    byte[] b = Bytes.toBytes(l2);
-    bb1.put(b, 0, 1);
-    bb2.put(b, 1, 7);
-    bb2.putLong(l3);
-    MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2);
-    assertEquals(4, multi.getInt());
-    assertEquals(l1, multi.getLong());
-    multi.skip(10);
-    multi.moveBack(4);
-    multi.moveBack(6);
-    multi.moveBack(8);
-    assertEquals(l1, multi.getLong());
-  }
-
-  @Test
-  public void testSubBuffer() {
-    ByteBuffer bb1 = ByteBuffer.allocateDirect(10);
-    ByteBuffer bb2 = ByteBuffer.allocateDirect(10);
-    MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2);
-    long l1 = 1234L, l2 = 100L;
-    multi.putLong(l1);
-    multi.putLong(l2);
-    multi.rewind();
-    ByteBuffer sub = multi.asSubBuffer(Bytes.SIZEOF_LONG);
-    assertTrue(bb1 == sub);
-    assertEquals(l1, ByteBufferUtils.toLong(sub, sub.position()));
-    multi.skip(Bytes.SIZEOF_LONG);
-    sub = multi.asSubBuffer(Bytes.SIZEOF_LONG);
-    assertFalse(bb1 == sub);
-    assertFalse(bb2 == sub);
-    assertEquals(l2, ByteBufferUtils.toLong(sub, sub.position()));
-    multi.rewind();
-    Pair<ByteBuffer, Integer> p = multi.asSubBuffer(8, Bytes.SIZEOF_LONG);
-    assertFalse(bb1 == p.getFirst());
-    assertFalse(bb2 == p.getFirst());
-    assertEquals(0, p.getSecond().intValue());
-    assertEquals(l2, ByteBufferUtils.toLong(sub, p.getSecond()));
-  }
-
-  @Test
-  public void testSliceDuplicateMethods() throws Exception {
-    ByteBuffer bb1 = ByteBuffer.allocateDirect(10);
-    ByteBuffer bb2 = ByteBuffer.allocateDirect(15);
-    MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2);
-    long l1 = 1234L, l2 = 100L;
-    multi.put((byte) 2);
-    multi.putLong(l1);
-    multi.putLong(l2);
-    multi.putInt(45);
-    multi.position(1);
-    multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG));
-    MultiByteBuffer sliced = multi.slice();
-    assertEquals(0, sliced.position());
-    assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit());
-    assertEquals(l1, sliced.getLong());
-    assertEquals(l2, sliced.getLong());
-    MultiByteBuffer dup = multi.duplicate();
-    assertEquals(1, dup.position());
-    assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit());
-    assertEquals(l1, dup.getLong());
-    assertEquals(l2, dup.getLong());
-  }
-
-  @Test
-  public void testGetWithPosOnMultiBuffers() throws IOException {
-    byte[] b = new byte[4];
-    byte[] b1 = new byte[4];
-    ByteBuffer bb1 = ByteBuffer.wrap(b);
-    ByteBuffer bb2 = ByteBuffer.wrap(b1);
-    MultiByteBuffer mbb1 = new MultiByteBuffer(bb1, bb2);
-    mbb1.position(2);
-    mbb1.putInt(4);
-    int res = mbb1.getInt(2);
-    byte[] bres = new byte[4];
-    bres[0] = mbb1.get(2);
-    bres[1] = mbb1.get(3);
-    bres[2] = mbb1.get(4);
-    bres[3] = mbb1.get(5);
-    int expected = Bytes.toInt(bres);
-    assertEquals(res, expected);
-  }
-
-  @Test
-  public void testGetIntStrictlyForwardWithPosOnMultiBuffers() throws IOException {
-    byte[] b = new byte[4];
-    byte[] b1 = new byte[4];
-    ByteBuffer bb1 = ByteBuffer.wrap(b);
-    ByteBuffer bb2 = ByteBuffer.wrap(b1);
-    MultiByteBuffer mbb1 = new MultiByteBuffer(bb1, bb2);
-    mbb1.position(2);
-    mbb1.putInt(4);
-    mbb1.position(7);
-    mbb1.put((byte) 2);
-    mbb1.position(0);
-    mbb1.getIntStrictlyForward(4);
-    byte res = mbb1.get(7);
-    assertEquals((byte) 2, res);
-  }
-}


[4/4] hbase git commit: HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)

Posted by ra...@apache.org.
HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/834f87b2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/834f87b2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/834f87b2

Branch: refs/heads/master
Commit: 834f87b23de533783ba5f5b858327a6164f17f55
Parents: a249989
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Jul 17 13:27:29 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Jul 17 13:27:29 2015 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/KeyValue.java  |   10 +-
 .../hadoop/hbase/io/ByteBuffInputStream.java    |  100 ++
 .../hadoop/hbase/io/ByteBufferInputStream.java  |  101 --
 .../io/encoding/BufferedDataBlockEncoder.java   |    4 +-
 .../io/encoding/CopyKeyDataBlockEncoder.java    |   12 +-
 .../hbase/io/encoding/DataBlockEncoder.java     |    3 +-
 .../hbase/io/encoding/DiffKeyDeltaEncoder.java  |   15 +-
 .../hbase/io/encoding/FastDiffDeltaEncoder.java |   18 +-
 .../io/encoding/HFileBlockDecodingContext.java  |   23 +-
 .../HFileBlockDefaultDecodingContext.java       |    8 +-
 .../io/encoding/PrefixKeyDeltaEncoder.java      |   18 +-
 .../apache/hadoop/hbase/io/hfile/BlockType.java |    5 +-
 .../org/apache/hadoop/hbase/nio/ByteBuff.java   |  438 +++++++
 .../apache/hadoop/hbase/nio/MultiByteBuff.java  | 1100 ++++++++++++++++++
 .../hadoop/hbase/nio/MultiByteBuffer.java       | 1047 -----------------
 .../apache/hadoop/hbase/nio/SingleByteBuff.java |  312 +++++
 .../hadoop/hbase/util/ByteBufferArray.java      |   64 +
 .../hadoop/hbase/util/ByteBufferUtils.java      |  118 +-
 .../java/org/apache/hadoop/hbase/util/Hash.java |    2 +
 .../apache/hadoop/hbase/util/UnsafeAccess.java  |   68 ++
 .../hbase/io/TestByteBufferInputStream.java     |   82 --
 .../hbase/io/TestMultiByteBuffInputStream.java  |   83 ++
 .../hadoop/hbase/nio/TestMultiByteBuff.java     |  324 ++++++
 .../hadoop/hbase/nio/TestMultiByteBuffer.java   |  316 -----
 .../hbase/codec/prefixtree/PrefixTreeCodec.java |   10 +-
 .../codec/prefixtree/PrefixTreeSeeker.java      |    1 +
 .../hbase/io/hfile/CacheableDeserializer.java   |    6 +-
 .../hbase/io/hfile/CompoundBloomFilter.java     |    8 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java |    4 +-
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  106 +-
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java  |   41 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  122 +-
 .../hbase/io/hfile/MemcachedBlockCache.java     |    5 +-
 .../hbase/io/hfile/bucket/BucketCache.java      |   11 +-
 .../io/hfile/bucket/ByteBufferIOEngine.java     |   14 +
 .../hbase/io/hfile/bucket/FileIOEngine.java     |   18 +
 .../hadoop/hbase/io/hfile/bucket/IOEngine.java  |   21 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |    2 -
 .../hadoop/hbase/regionserver/StoreFile.java    |    4 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |    1 -
 .../apache/hadoop/hbase/util/BloomFilter.java   |    6 +-
 .../hadoop/hbase/util/BloomFilterChunk.java     |   31 -
 .../hadoop/hbase/util/BloomFilterUtil.java      |   16 +-
 .../hbase/util/hbck/TableLockChecker.java       |    1 -
 .../hadoop/hbase/client/TestFromClientSide.java |    2 +
 .../io/encoding/TestDataBlockEncoders.java      |    4 +-
 .../hadoop/hbase/io/hfile/CacheTestUtils.java   |   13 +-
 .../hadoop/hbase/io/hfile/TestCacheConfig.java  |    6 +-
 .../hadoop/hbase/io/hfile/TestChecksum.java     |    6 +-
 .../apache/hadoop/hbase/io/hfile/TestHFile.java |   11 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java   |   33 +-
 .../io/hfile/TestHFileBlockCompatibility.java   |    7 +-
 .../hbase/io/hfile/TestHFileBlockIndex.java     |   12 +-
 .../hbase/io/hfile/TestHFileWriterV2.java       |    6 +-
 .../hbase/io/hfile/TestHFileWriterV3.java       |    6 +-
 .../io/hfile/bucket/TestByteBufferIOEngine.java |   45 +
 .../hadoop/hbase/util/TestBloomFilterChunk.java |   68 +-
 .../hadoop/hbase/util/TestByteBuffUtils.java    |   78 ++
 .../hadoop/hbase/util/TestByteBufferUtils.java  |    2 +-
 59 files changed, 3104 insertions(+), 1894 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 8c73984..368bf41 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -2613,6 +2613,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
    */
   public static class KeyOnlyKeyValue extends KeyValue {
+    private short rowLen = -1;
     public KeyOnlyKeyValue() {
 
     }
@@ -2624,6 +2625,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       this.bytes = b;
       this.length = length;
       this.offset = offset;
+      this.rowLen = Bytes.toShort(this.bytes, this.offset);
     }
 
     @Override
@@ -2642,6 +2644,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
       this.bytes = key;
       this.offset = offset;
       this.length = length;
+      this.rowLen = Bytes.toShort(this.bytes, this.offset);
     }
 
     @Override
@@ -2699,7 +2702,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
 
     @Override
     public short getRowLength() {
-      return Bytes.toShort(this.bytes, getKeyOffset());
+      return rowLen;
     }
 
     @Override
@@ -2769,5 +2772,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     public boolean equals(Object other) {
       return super.equals(other);
     }
+
+    @Override
+    public long heapSize() {
+      return super.heapSize() + Bytes.SIZEOF_SHORT;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
new file mode 100644
index 0000000..4f6b3c2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+
+/**
+ * Not thread safe!
+ * <p>
+ * Please note that the reads will cause position movement on wrapped ByteBuff.
+ */
+@InterfaceAudience.Private
+public class ByteBuffInputStream extends InputStream {
+
+  private ByteBuff buf;
+
+  public ByteBuffInputStream(ByteBuff buf) {
+      this.buf = buf;
+  }
+
+  /**
+   * Reads the next byte of data from this input stream. The value byte is returned as an
+   * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
+   * because the end of the stream has been reached, the value <code>-1</code> is returned.
+   * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
+   */
+  public int read() {
+    if (this.buf.hasRemaining()) {
+      return (this.buf.get() & 0xff);
+    }
+    return -1;
+  }
+
+  /**
+   * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
+   * given offset).
+   * @param b the array into which the data is read.
+   * @param off the start offset in the destination array <code>b</code>
+   * @param len the maximum number of bytes to read.
+   * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
+   *         1 byte can be read because the end of the stream has been reached.
+   */
+  public int read (byte b[], int off, int len) {
+    int avail = available();
+    if (avail <= 0) {
+      return -1;
+    }
+    if (len <= 0) {
+      return 0;
+    }
+
+    if (len > avail) {
+      len = avail;
+    }
+    this.buf.get(b, off, len);
+    return len;
+  }
+
+  /**
+   * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
+   * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
+   * equal to the smaller of <code>n</code> and remaining bytes in the stream.
+   * @param n the number of bytes to be skipped.
+   * @return the actual number of bytes skipped.
+   */
+  public long skip(long n) {
+    long k = Math.min(n, available());
+    if (k <= 0) {
+      return 0;
+    }
+    this.buf.skip((int) k);
+    return k;
+  }
+
+  /**
+   * @return  the number of remaining bytes that can be read (or skipped
+   *          over) from this input stream.
+   */
+  public int available() {
+    return this.buf.remaining();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
deleted file mode 100644
index 1530ccd..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Not thread safe!
- * <p>
- * Please note that the reads will cause position movement on wrapped ByteBuffer.
- */
-@InterfaceAudience.Private
-public class ByteBufferInputStream extends InputStream {
-
-  private ByteBuffer buf;
-
-  public ByteBufferInputStream(ByteBuffer buf) {
-      this.buf = buf;
-  }
-
-  /**
-   * Reads the next byte of data from this input stream. The value byte is returned as an
-   * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available
-   * because the end of the stream has been reached, the value <code>-1</code> is returned.
-   * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
-   */
-  public int read() {
-    if (this.buf.hasRemaining()) {
-      return (this.buf.get() & 0xff);
-    }
-    return -1;
-  }
-
-  /**
-   * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from
-   * given offset).
-   * @param b the array into which the data is read.
-   * @param off the start offset in the destination array <code>b</code>
-   * @param len the maximum number of bytes to read.
-   * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
-   *         1 byte can be read because the end of the stream has been reached.
-   */
-  public int read(byte b[], int off, int len) {
-    int avail = available();
-    if (avail <= 0) {
-      return -1;
-    }
-
-    if (len > avail) {
-      len = avail;
-    }
-    if (len <= 0) {
-      return 0;
-    }
-
-    this.buf.get(b, off, len);
-    return len;
-  }
-
-  /**
-   * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the
-   * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
-   * equal to the smaller of <code>n</code> and remaining bytes in the stream.
-   * @param n the number of bytes to be skipped.
-   * @return the actual number of bytes skipped.
-   */
-  public long skip(long n) {
-    long k = Math.min(n, available());
-    if (k < 0) {
-      k = 0;
-    }
-    this.buf.position((int) (this.buf.position() + k));
-    return k;
-  }
-
-  /**
-   * @return  the number of remaining bytes that can be read (or skipped
-   *          over) from this input stream.
-   */
-  public int available() {
-    return this.buf.remaining();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 03875dc..966c59b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -601,7 +601,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
       kvBuffer.putInt(current.keyLength);
       kvBuffer.putInt(current.valueLength);
       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
-      ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset,
+      ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.valueOffset,
           current.valueLength);
       if (current.tagsLength > 0) {
         // Put short as unsigned
@@ -610,7 +610,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
         if (current.tagsOffset != -1) {
           // the offset of the tags bytes in the underlying buffer is marked. So the temp
           // buffer,tagsBuffer was not been used.
-          ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset,
+          ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.tagsOffset,
               current.tagsLength);
         } else {
           // When tagsOffset is marked as -1, tag compression was present and so the tags were

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 4eea272..662be29 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
@@ -66,13 +67,12 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
-    int keyLength = block.getInt(Bytes.SIZEOF_INT);
-    ByteBuffer dup = block.duplicate();
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
+    int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT);
     int pos = 3 * Bytes.SIZEOF_INT;
-    dup.position(pos);
-    dup.limit(pos + keyLength);
-    return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+    ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate();
+    // TODO : to be changed here for BBCell
+    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index b0467b8..ce71308 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  * Encoding of KeyValue. It aims to be fast and efficient using assumptions:
@@ -90,7 +91,7 @@ public interface DataBlockEncoder {
    * @param block encoded block we want index, the position will not change
    * @return First key in block as a cell.
    */
-  Cell getFirstKeyCellInBlock(ByteBuffer block);
+  Cell getFirstKeyCellInBlock(ByteBuff block);
 
   /**
    * Create a HFileBlock seeker which find KeyValues within a block.

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index f2d4751..90b8e6e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -305,15 +306,16 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
     block.mark();
     block.position(Bytes.SIZEOF_INT);
     byte familyLength = block.get();
-    ByteBufferUtils.skip(block, familyLength);
+    block.skip(familyLength);
     byte flag = block.get();
-    int keyLength = ByteBufferUtils.readCompressedInt(block);
-    ByteBufferUtils.readCompressedInt(block); // valueLength
-    ByteBufferUtils.readCompressedInt(block); // commonLength
+    int keyLength  = ByteBuff.readCompressedInt(block);
+    // TODO : See if we can avoid these reads as the read values are not getting used
+    ByteBuff.readCompressedInt(block); // valueLength
+    ByteBuff.readCompressedInt(block); // commonLength
     ByteBuffer result = ByteBuffer.allocate(keyLength);
 
     // copy row
@@ -341,7 +343,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
     // copy the timestamp and type
     int timestampFitInBytes =
         ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
-    long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes);
+    long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
     if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
       timestamp = -timestamp;
     }
@@ -350,6 +352,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
     block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
 
     block.reset();
+    // The result is already a BB. So always we will create a KeyOnlyKv.
     return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index f750e09..fa4adbd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -354,18 +355,17 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
     block.mark();
     block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
-    int keyLength = ByteBufferUtils.readCompressedInt(block);
-    ByteBufferUtils.readCompressedInt(block); // valueLength
-    ByteBufferUtils.readCompressedInt(block); // commonLength
-    int pos = block.position();
+    int keyLength = ByteBuff.readCompressedInt(block);
+    // TODO : See if we can avoid these reads as the read values are not getting used
+    ByteBuff.readCompressedInt(block); // valueLength
+    ByteBuff.readCompressedInt(block); // commonLength
+    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
     block.reset();
-    ByteBuffer dup = block.duplicate();
-    dup.position(pos);
-    dup.limit(pos + keyLength);
-    return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+    // TODO : Change to BBCell.
+    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
index 37001cc..ffdb694 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
@@ -17,10 +17,10 @@
 package org.apache.hadoop.hbase.io.encoding;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  * A decoding context that is created by a reader's encoder, and is shared
@@ -32,22 +32,27 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 public interface HFileBlockDecodingContext {
 
   /**
-   * Perform all actions that need to be done before the encoder's real decoding process.
-   * Decompression needs to be done if {@link HFileContext#getCompression()} returns a valid compression
+   * Perform all actions that need to be done before the encoder's real decoding
+   * process. Decompression needs to be done if
+   * {@link HFileContext#getCompression()} returns a valid compression
    * algorithm.
    *
-   * @param onDiskSizeWithoutHeader numBytes after block and encoding headers
-   * @param uncompressedSizeWithoutHeader numBytes without header required to store the block after
+   * @param onDiskSizeWithoutHeader
+   *          numBytes after block and encoding headers
+   * @param uncompressedSizeWithoutHeader
+   *          numBytes without header required to store the block after
    *          decompressing (not decoding)
-   * @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data
-   * @param onDiskBlock on disk data to be decoded
+   * @param blockBufferWithoutHeader
+   *          ByteBuffer pointed after the header but before the data
+   * @param onDiskBlock
+   *          on disk data to be decoded
    * @throws IOException
    */
   void prepareDecoding(
     int onDiskSizeWithoutHeader,
     int uncompressedSizeWithoutHeader,
-    ByteBuffer blockBufferWithoutHeader,
-    ByteBuffer onDiskBlock
+    ByteBuff blockBufferWithoutHeader,
+    ByteBuff onDiskBlock
   ) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
index 78bb0d6..30382d9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java
@@ -19,17 +19,17 @@ package org.apache.hadoop.hbase.io.encoding;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
 import org.apache.hadoop.hbase.io.crypto.Decryptor;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -51,8 +51,8 @@ public class HFileBlockDefaultDecodingContext implements
 
   @Override
   public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
-      ByteBuffer blockBufferWithoutHeader, ByteBuffer onDiskBlock) throws IOException {
-    InputStream in = new DataInputStream(new ByteBufferInputStream(onDiskBlock));
+      ByteBuff blockBufferWithoutHeader, ByteBuff onDiskBlock) throws IOException {
+    InputStream in = new DataInputStream(new ByteBuffInputStream(onDiskBlock));
 
     Encryption.Context cryptoContext = fileContext.getEncryptionContext();
     if (cryptoContext != Encryption.Context.NONE) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 15608cc..6e89de4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -172,22 +173,21 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
   }
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
     block.mark();
     block.position(Bytes.SIZEOF_INT);
-    int keyLength = ByteBufferUtils.readCompressedInt(block);
-    ByteBufferUtils.readCompressedInt(block);
-    int commonLength = ByteBufferUtils.readCompressedInt(block);
+    int keyLength = ByteBuff.readCompressedInt(block);
+    // TODO : See if we can avoid these reads as the read values are not getting used
+    ByteBuff.readCompressedInt(block);
+    int commonLength = ByteBuff.readCompressedInt(block);
     if (commonLength != 0) {
       throw new AssertionError("Nonzero common length in the first key in "
           + "block: " + commonLength);
     }
-    int pos = block.position();
+    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
     block.reset();
-    ByteBuffer dup = block.duplicate();
-    dup.position(pos);
-    dup.limit(pos + keyLength);
-    return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength);
+    // TODO : Change to BBCell
+    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
index 0db584e..4228f57 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -131,7 +132,7 @@ public enum BlockType {
     out.write(magic);
   }
 
-  public void write(ByteBuffer buf) {
+  public void write(ByteBuff buf) {
     buf.put(magic);
   }
 
@@ -161,7 +162,7 @@ public enum BlockType {
     return parse(buf, 0, buf.length);
   }
 
-  public static BlockType read(ByteBuffer buf) throws IOException {
+  public static BlockType read(ByteBuff buf) throws IOException {
     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)];
     buf.get(magicBuf);
     BlockType blockType = parse(magicBuf, 0, magicBuf.length);

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
new file mode 100644
index 0000000..14e77a7
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An abstract class that abstracts out as to how the byte buffers are used,
+ * either single or multiple. We have this interface because the java's ByteBuffers
+ * cannot be sub-classed. This class provides APIs similar to the ones provided
+ * in java's nio ByteBuffers and allows you to do positional reads/writes and relative
+ * reads and writes on the underlying BB. In addition to it, we have some additional APIs which
+ * helps us in the read path.
+ */
+@InterfaceAudience.Private
+public abstract class ByteBuff {
+  /**
+   * @return this ByteBuff's current position
+   */
+  public abstract int position();
+
+  /**
+   * Sets this ByteBuff's position to the given value.
+   * @param position
+   * @return this object
+   */
+  public abstract ByteBuff position(int position);
+
+  /**
+   * Jumps the current position of this ByteBuff by specified length.
+   * @param len the length to be skipped
+   */
+  public abstract ByteBuff skip(int len);
+
+  /**
+   * Jumps back the current position of this ByteBuff by specified length.
+   * @param len the length to move back
+   */
+  public abstract ByteBuff moveBack(int len);
+
+  /**
+   * @return the total capacity of this ByteBuff.
+   */
+  public abstract int capacity();
+
+  /**
+   * Returns the limit of this ByteBuff
+   * @return limit of the ByteBuff
+   */
+  public abstract int limit();
+
+  /**
+   * Marks the limit of this ByteBuff.
+   * @param limit
+   * @return This ByteBuff
+   */
+  public abstract ByteBuff limit(int limit);
+
+  /**
+   * Rewinds this ByteBuff and the position is set to 0
+   * @return this object
+   */
+  public abstract ByteBuff rewind();
+
+  /**
+   * Marks the current position of the ByteBuff
+   * @return this object
+   */
+  public abstract ByteBuff mark();
+
+  /**
+   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
+   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
+   * as such will be returned. So users are warned not to change the position or limit of this
+   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
+   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
+   * the bytes to a newly created ByteBuffer of required size and return that.
+   *
+   * @param length number of bytes required.
+   * @return bytes from current position till length specified, as a single ByteButter.
+   */
+  public abstract ByteBuffer asSubByteBuffer(int length);
+
+  /**
+   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
+   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
+   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
+   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
+   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
+   * ByteBuffer of required size and return that.
+   *
+   * @param offset the offset in this ByteBuff from where the subBuffer should be created
+   * @param length the length of the subBuffer
+   * @param pair a pair that will have the bytes from the current position till length specified,
+   *        as a single ByteBuffer and offset in that Buffer where the bytes starts.
+   *        Since this API gets called in a loop we are passing a pair to it which could be created
+   *        outside the loop and the method would set the values on the pair that is passed in by
+   *        the caller. Thus it avoids more object creations that would happen if the pair that is
+   *        returned is created by this method every time.
+   */
+  public abstract void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair);
+
+  /**
+   * Returns the number of elements between the current position and the
+   * limit.
+   * @return the remaining elements in this ByteBuff
+   */
+  public abstract int remaining();
+
+  /**
+   * Returns true if there are elements between the current position and the limt
+   * @return true if there are elements, false otherwise
+   */
+  public abstract boolean hasRemaining();
+
+  /**
+   * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff
+   * is reset back to last marked position.
+   * @return This ByteBuff
+   */
+  public abstract ByteBuff reset();
+
+  /**
+   * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark
+   * of the new ByteBuff will be independent than that of the original ByteBuff.
+   * The content of the new ByteBuff will start at this ByteBuff's current position
+   * @return a sliced ByteBuff
+   */
+  public abstract ByteBuff slice();
+
+  /**
+   * Returns an ByteBuff which is a duplicate version of this ByteBuff. The
+   * position, limit and mark of the new ByteBuff will be independent than that
+   * of the original ByteBuff. The content of the new ByteBuff will start at
+   * this ByteBuff's current position The position, limit and mark of the new
+   * ByteBuff would be identical to this ByteBuff in terms of values.
+   *
+   * @return a sliced ByteBuff
+   */
+  public abstract ByteBuff duplicate();
+
+  /**
+   * A relative method that returns byte at the current position.  Increments the
+   * current position by the size of a byte.
+   * @return the byte at the current position
+   */
+  public abstract byte get();
+
+  /**
+   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the byte at the given index
+   */
+  public abstract byte get(int index);
+
+  /**
+   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #get(int)} the index specified should be after
+   * the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the byte value at the given index.
+   */
+  public abstract byte getByteStrictlyForward(int index);
+
+  /**
+   * Writes a byte to this ByteBuff at the current position and increments the position
+   * @param b
+   * @return this object
+   */
+  public abstract ByteBuff put(byte b);
+
+  /**
+   * Writes a byte to this ByteBuff at the given index
+   * @param index
+   * @param b
+   * @return this object
+   */
+  public abstract ByteBuff put(int index, byte b);
+
+  /**
+   * Copies the specified number of bytes from this ByteBuff's current position to
+   * the byte[]'s offset. Also advances the position of the ByteBuff by the given length.
+   * @param dst
+   * @param offset within the current array
+   * @param length upto which the bytes to be copied
+   */
+  public abstract void get(byte[] dst, int offset, int length);
+
+  /**
+   * Copies the content from this ByteBuff's current position to the byte array and fills it. Also
+   * advances the position of the ByteBuff by the length of the byte[].
+   * @param dst
+   */
+  public abstract void get(byte[] dst);
+
+  /**
+   * Copies from the given byte[] to this ByteBuff
+   * @param src
+   * @param offset the position in the byte array from which the copy should be done
+   * @param length the length upto which the copy should happen
+   * @return this ByteBuff
+   */
+  public abstract ByteBuff put(byte[] src, int offset, int length);
+
+  /**
+   * Copies from the given byte[] to this ByteBuff
+   * @param src
+   * @return this ByteBuff
+   */
+  public abstract ByteBuff put(byte[] src);
+
+  /**
+   * @return true or false if the underlying BB support hasArray
+   */
+  public abstract boolean hasArray();
+
+  /**
+   * @return the byte[] if the underlying BB has single BB and hasArray true
+   */
+  public abstract byte[] array();
+
+  /**
+   * @return the arrayOffset of the byte[] incase of a single BB backed ByteBuff
+   */
+  public abstract int arrayOffset();
+
+  /**
+   * Returns the short value at the current position. Also advances the position by the size
+   * of short
+   *
+   * @return the short value at the current position
+   */
+  public abstract short getShort();
+
+  /**
+   * Fetches the short value at the given index. Does not change position of the
+   * underlying ByteBuffers. The caller is sure that the index will be after
+   * the current position of this ByteBuff. So even if the current short does not fit in the
+   * current item we can safely move to the next item and fetch the remaining bytes forming
+   * the short
+   *
+   * @param index
+   * @return the short value at the given index
+   */
+  public abstract short getShort(int index);
+
+  /**
+   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #getShort(int)} the index specified should be
+   * after the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the short value at the given index.
+   */
+  public abstract short getShortStrictlyForward(int index);
+
+  /**
+   * Returns the int value at the current position. Also advances the position by the size of int
+   *
+   * @return the int value at the current position
+   */
+  public abstract int getInt();
+
+  /**
+   * Writes an int to this ByteBuff at its current position. Also advances the position
+   * by size of int
+   * @param value Int value to write
+   * @return this object
+   */
+  public abstract ByteBuff putInt(int value);
+
+  /**
+   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
+   * Even if the current int does not fit in the
+   * current item we can safely move to the next item and fetch the remaining bytes forming
+   * the int
+   *
+   * @param index
+   * @return the int value at the given index
+   */
+  public abstract int getInt(int index);
+
+  /**
+   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #getInt(int)} the index specified should be after
+   * the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the int value at the given index.
+   */
+  // TODO: any better name here?? getIntFromSubsequentPosition? or getIntAfterCurrentPosition?
+  // TODO : Make this relative wrt current position? Follow on JIRA
+  public abstract int getIntStrictlyForward(int index);
+  /**
+   * Returns the long value at the current position. Also advances the position by the size of long
+   *
+   * @return the long value at the current position
+   */
+  public abstract long getLong();
+
+  /**
+   * Writes a long to this ByteBuff at its current position.
+   * Also advances the position by size of long
+   * @param value Long value to write
+   * @return this object
+   */
+  public abstract ByteBuff putLong(long value);
+
+  /**
+   * Fetches the long at the given index. Does not change position of the
+   * underlying ByteBuffers. The caller is sure that the index will be after
+   * the current position of this ByteBuff. So even if the current long does not fit in the
+   * current item we can safely move to the next item and fetch the remaining bytes forming
+   * the long
+   *
+   * @param index
+   * @return the long value at the given index
+   */
+  public abstract long getLong(int index);
+
+  /**
+   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers.
+   * The difference for this API from {@link #getLong(int)} the index specified should be after
+   * the current position. If not throws IndexOutOfBoundsException
+   * @param index
+   * @return the long value at the given index.
+   */
+  public abstract long getLongStrictlyForward(int index);
+
+  /**
+   * Copy the content from this ByteBuff to a byte[] based on the given offset and
+   * length
+   *
+   * @param offset
+   *          the position from where the copy should start
+   * @param length
+   *          the length upto which the copy has to be done
+   * @return byte[] with the copied contents from this ByteBuff.
+   */
+  public abstract byte[] toBytes(int offset, int length);
+
+  /**
+   * Copies the content from this ByteBuff to a ByteBuffer
+   * Note : This will advance the position marker of {@code out} but not change the position maker
+   * for this ByteBuff
+   * @param out the ByteBuffer to which the copy has to happen
+   * @param sourceOffset the offset in the ByteBuff from which the elements has
+   * to be copied
+   * @param length the length in this ByteBuff upto which the elements has to be copied
+   */
+  public abstract void get(ByteBuffer out, int sourceOffset, int length);
+
+  /**
+   * Copies the contents from the src ByteBuff to this ByteBuff. This will be
+   * absolute positional copying and
+   * won't affect the position of any of the buffers.
+   * @param offset the position in this ByteBuff to which the copy should happen
+   * @param src the src ByteBuff
+   * @param srcOffset the offset in the src ByteBuff from where the elements should be read
+   * @param length the length up to which the copy should happen
+   */
+  public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
+
+  // static helper methods
+  /**
+   * Read integer from ByteBuff coded in 7 bits and increment position.
+   * @return Read integer.
+   */
+  public static int readCompressedInt(ByteBuff buf) {
+    byte b = buf.get();
+    if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) {
+      return (b & ByteBufferUtils.VALUE_MASK)
+          + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT);
+    }
+    return b & ByteBufferUtils.VALUE_MASK;
+  }
+
+  /**
+   * Compares two ByteBuffs
+   *
+   * @param buf1 the first ByteBuff
+   * @param o1 the offset in the first ByteBuff from where the compare has to happen
+   * @param len1 the length in the first ByteBuff upto which the compare has to happen
+   * @param buf2 the second ByteBuff
+   * @param o2 the offset in the second ByteBuff from where the compare has to happen
+   * @param len2 the length in the second ByteBuff upto which the compare has to happen
+   * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is
+   *         smaller than buf2.
+   */
+  public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2,
+      int o2, int len2) {
+    if (buf1.hasArray() && buf2.hasArray()) {
+      return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
+          buf2.arrayOffset() + o2, len2);
+    }
+    int end1 = o1 + len1;
+    int end2 = o2 + len2;
+    for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
+      int a = buf1.get(i) & 0xFF;
+      int b = buf2.get(j) & 0xFF;
+      if (a != b) {
+        return a - b;
+      }
+    }
+    return len1 - len2;
+  }
+
+  /**
+   * Read long which was written to fitInBytes bytes and increment position.
+   * @param fitInBytes In how many bytes given long is stored.
+   * @return The value of parsed long.
+   */
+  public static long readLong(ByteBuff in, final int fitInBytes) {
+    long tmpLength = 0;
+    for (int i = 0; i < fitInBytes; ++i) {
+      tmpLength |= (in.get() & 0xffl) << (8l * i);
+    }
+    return tmpLength;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
new file mode 100644
index 0000000..984ade5
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -0,0 +1,1100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.InvalidMarkException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
+ * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
+ * short, long etc and doing operations like mark, reset, slice etc. This has to be used when
+ * data is split across multiple byte buffers and we don't want copy them to single buffer
+ * for reading from it.
+ */
+@InterfaceAudience.Private
+public class MultiByteBuff extends ByteBuff {
+
+  private final ByteBuffer[] items;
+  // Pointer to the current item in the MBB
+  private ByteBuffer curItem = null;
+  // Index of the current item in the MBB
+  private int curItemIndex = 0;
+
+  private int limit = 0;
+  private int limitedItemIndex;
+  private int markedItemIndex = -1;
+  private final int[] itemBeginPos;
+
+  public MultiByteBuff(ByteBuffer... items) {
+    assert items != null;
+    assert items.length > 0;
+    this.items = items;
+    this.curItem = this.items[this.curItemIndex];
+    // See below optimization in getInt(int) where we check whether the given index land in current
+    // item. For this we need to check whether the passed index is less than the next item begin
+    // offset. To handle this effectively for the last item buffer, we add an extra item into this
+    // array.
+    itemBeginPos = new int[items.length + 1];
+    int offset = 0;
+    for (int i = 0; i < items.length; i++) {
+      ByteBuffer item = items[i];
+      item.rewind();
+      itemBeginPos[i] = offset;
+      int l = item.limit() - item.position();
+      offset += l;
+    }
+    this.limit = offset;
+    this.itemBeginPos[items.length] = offset + 1;
+    this.limitedItemIndex = this.items.length - 1;
+  }
+
+  private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
+      int curItemIndex, int markedIndex) {
+    this.items = items;
+    this.curItemIndex = curItemIndex;
+    this.curItem = this.items[this.curItemIndex];
+    this.itemBeginPos = itemBeginPos;
+    this.limit = limit;
+    this.limitedItemIndex = limitedIndex;
+    this.markedItemIndex = markedIndex;
+  }
+
+  /**
+   * @throws UnsupportedOperationException MBB does not support
+   * array based operations
+   */
+  @Override
+  public byte[] array() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @throws UnsupportedOperationException MBB does not
+   * support array based operations
+   */
+  @Override
+  public int arrayOffset() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return false. MBB does not support array based operations
+   */
+  @Override
+  public boolean hasArray() {
+    return false;
+  }
+
+  /**
+   * @return the total capacity of this MultiByteBuffer.
+   */
+  @Override
+  public int capacity() {
+    int c = 0;
+    for (ByteBuffer item : this.items) {
+      c += item.capacity();
+    }
+    return c;
+  }
+
+  /**
+   * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the byte at the given index
+   */
+  @Override
+  public byte get(int index) {
+    int itemIndex = getItemIndex(index);
+    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
+  }
+
+  @Override
+  public byte getByteStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex = getItemIndexFromCurItemIndex(index);
+    return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
+  }
+
+  /*
+   * Returns in which sub ByteBuffer, the given element index will be available.
+   */
+  private int getItemIndex(int elemIndex) {
+    int index = 1;
+    while (elemIndex >= this.itemBeginPos[index]) {
+      index++;
+      if (index == this.itemBeginPos.length) {
+        throw new IndexOutOfBoundsException();
+      }
+    }
+    return index - 1;
+  }
+
+  /*
+   * Returns in which sub ByteBuffer, the given element index will be available. In this case we are
+   * sure that the item will be after MBB's current position
+   */
+  private int getItemIndexFromCurItemIndex(int elemIndex) {
+    int index = this.curItemIndex;
+    while (elemIndex >= this.itemBeginPos[index]) {
+      index++;
+      if (index == this.itemBeginPos.length) {
+        throw new IndexOutOfBoundsException();
+      }
+    }
+    return index - 1;
+  }
+
+  /**
+   * Fetches the int at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the int value at the given index
+   */
+  public int getInt(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex] <= index
+        && this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndex(index);
+    }
+    return getInt(index, itemIndex);
+  }
+
+  @Override
+  public int getIntStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndexFromCurItemIndex(index);
+    }
+    return getInt(index, itemIndex);
+  }
+
+  /**
+   * Fetches the short at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the short value at the given index
+   */
+  public short getShort(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex] <= index
+        && this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndex(index);
+    }
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) {
+      return ByteBufferUtils.toShort(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a int. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available one byte from this item and remaining one from next
+    short n = 0;
+    n ^= item.get(offsetInItem) & 0xFF;
+    n <<= 8;
+    n ^= nextItem.get(0) & 0xFF;
+    return n;
+  }
+
+  @Override
+  public short getShortStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndexFromCurItemIndex(index);
+    }
+    return getShort(index, itemIndex);
+  }
+
+  private int getInt(int index, int itemIndex) {
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_INT) {
+      return ByteBufferUtils.toInt(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a int. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    int l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_INT - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  private short getShort(int index, int itemIndex) {
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_SHORT) {
+      return ByteBufferUtils.toShort(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a int. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    short l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  private long getLong(int index, int itemIndex) {
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_LONG) {
+      return ByteBufferUtils.toLong(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a long. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    long l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  /**
+   * Fetches the long at the given index. Does not change position of the underlying ByteBuffers
+   * @param index
+   * @return the long value at the given index
+   */
+  public long getLong(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex] <= index
+        && this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndex(index);
+    }
+    ByteBuffer item = items[itemIndex];
+    int offsetInItem = index - this.itemBeginPos[itemIndex];
+    int remainingLen = item.limit() - offsetInItem;
+    if (remainingLen >= Bytes.SIZEOF_LONG) {
+      return ByteBufferUtils.toLong(item, offsetInItem);
+    }
+    if (items.length - 1 == itemIndex) {
+      // means cur item is the last one and we wont be able to read a long. Throw exception
+      throw new BufferUnderflowException();
+    }
+    ByteBuffer nextItem = items[itemIndex + 1];
+    // Get available bytes from this item and remaining from next
+    long l = 0;
+    for (int i = offsetInItem; i < item.capacity(); i++) {
+      l <<= 8;
+      l ^= item.get(i) & 0xFF;
+    }
+    for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) {
+      l <<= 8;
+      l ^= nextItem.get(i) & 0xFF;
+    }
+    return l;
+  }
+
+  @Override
+  public long getLongStrictlyForward(int index) {
+    // Mostly the index specified will land within this current item. Short circuit for that
+    if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) {
+      throw new IndexOutOfBoundsException("The index " + index
+          + " should not be less than current position " + this.position());
+    }
+    int itemIndex;
+    if (this.itemBeginPos[this.curItemIndex + 1] > index) {
+      itemIndex = this.curItemIndex;
+    } else {
+      itemIndex = getItemIndexFromCurItemIndex(index);
+    }
+    return getLong(index, itemIndex);
+  }
+
+  /**
+   * @return this MBB's current position
+   */
+  @Override
+  public int position() {
+    return itemBeginPos[this.curItemIndex] + this.curItem.position();
+  }
+
+  /**
+   * Sets this MBB's position to the given value.
+   * @param position
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff position(int position) {
+    // Short circuit for positioning within the cur item. Mostly that is the case.
+    if (this.itemBeginPos[this.curItemIndex] <= position
+        && this.itemBeginPos[this.curItemIndex + 1] > position) {
+      this.curItem.position(position - this.itemBeginPos[this.curItemIndex]);
+      return this;
+    }
+    int itemIndex = getItemIndex(position);
+    // All items from 0 - curItem-1 set position at end.
+    for (int i = 0; i < itemIndex; i++) {
+      this.items[i].position(this.items[i].limit());
+    }
+    // All items after curItem set position at begin
+    for (int i = itemIndex + 1; i < this.items.length; i++) {
+      this.items[i].position(0);
+    }
+    this.curItem = this.items[itemIndex];
+    this.curItem.position(position - this.itemBeginPos[itemIndex]);
+    this.curItemIndex = itemIndex;
+    return this;
+  }
+
+  /**
+   * Rewinds this MBB and the position is set to 0
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff rewind() {
+    for (int i = 0; i < this.items.length; i++) {
+      this.items[i].rewind();
+    }
+    this.curItemIndex = 0;
+    this.curItem = this.items[this.curItemIndex];
+    this.markedItemIndex = -1;
+    return this;
+  }
+
+  /**
+   * Marks the current position of the MBB
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff mark() {
+    this.markedItemIndex = this.curItemIndex;
+    this.curItem.mark();
+    return this;
+  }
+
+  /**
+   * Similar to {@link ByteBuffer}.reset(), ensures that this MBB
+   * is reset back to last marked position.
+   * @return This MBB
+   */
+  @Override
+  public MultiByteBuff reset() {
+    // when the buffer is moved to the next one.. the reset should happen on the previous marked
+    // item and the new one should be taken as the base
+    if (this.markedItemIndex < 0) throw new InvalidMarkException();
+    ByteBuffer markedItem = this.items[this.markedItemIndex];
+    markedItem.reset();
+    this.curItem = markedItem;
+    // All items after the marked position upto the current item should be reset to 0
+    for (int i = this.curItemIndex; i > this.markedItemIndex; i--) {
+      this.items[i].position(0);
+    }
+    this.curItemIndex = this.markedItemIndex;
+    return this;
+  }
+
+  /**
+   * Returns the number of elements between the current position and the
+   * limit.
+   * @return the remaining elements in this MBB
+   */
+  @Override
+  public int remaining() {
+    int remain = 0;
+    for (int i = curItemIndex; i < items.length; i++) {
+      remain += items[i].remaining();
+    }
+    return remain;
+  }
+
+  /**
+   * Returns true if there are elements between the current position and the limt
+   * @return true if there are elements, false otherwise
+   */
+  @Override
+  public final boolean hasRemaining() {
+    return this.curItem.hasRemaining() || this.curItemIndex < this.items.length - 1;
+  }
+
+  /**
+   * A relative method that returns byte at the current position.  Increments the
+   * current position by the size of a byte.
+   * @return the byte at the current position
+   */
+  @Override
+  public byte get() {
+    if (this.curItem.remaining() == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+    }
+    return this.curItem.get();
+  }
+
+  /**
+   * Returns the short value at the current position. Also advances the position by the size
+   * of short
+   *
+   * @return the short value at the current position
+   */
+  @Override
+  public short getShort() {
+    int remaining = this.curItem.remaining();
+    if (remaining >= Bytes.SIZEOF_SHORT) {
+      return this.curItem.getShort();
+    }
+    if (remaining == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      return this.curItem.getShort();
+    }
+    short n = 0;
+    n ^= get() & 0xFF;
+    n <<= 8;
+    n ^= get() & 0xFF;
+    return n;
+  }
+
+  /**
+   * Returns the int value at the current position. Also advances the position by the size of int
+   *
+   * @return the int value at the current position
+   */
+  @Override
+  public int getInt() {
+    int remaining = this.curItem.remaining();
+    if (remaining >= Bytes.SIZEOF_INT) {
+      return this.curItem.getInt();
+    }
+    if (remaining == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      return this.curItem.getInt();
+    }
+    // Get available bytes from this item and remaining from next
+    int n = 0;
+    for (int i = 0; i < Bytes.SIZEOF_INT; i++) {
+      n <<= 8;
+      n ^= get() & 0xFF;
+    }
+    return n;
+  }
+
+
+  /**
+   * Returns the long value at the current position. Also advances the position by the size of long
+   *
+   * @return the long value at the current position
+   */
+  @Override
+  public long getLong() {
+    int remaining = this.curItem.remaining();
+    if (remaining >= Bytes.SIZEOF_LONG) {
+      return this.curItem.getLong();
+    }
+    if (remaining == 0) {
+      if (items.length - 1 == this.curItemIndex) {
+        // means cur item is the last one and we wont be able to read a long. Throw exception
+        throw new BufferUnderflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      return this.curItem.getLong();
+    }
+    // Get available bytes from this item and remaining from next
+    long l = 0;
+    for (int i = 0; i < Bytes.SIZEOF_LONG; i++) {
+      l <<= 8;
+      l ^= get() & 0xFF;
+    }
+    return l;
+  }
+
+  /**
+   * Copies the content from this MBB's current position to the byte array and fills it. Also
+   * advances the position of the MBB by the length of the byte[].
+   * @param dst
+   */
+  @Override
+  public void get(byte[] dst) {
+    get(dst, 0, dst.length);
+  }
+
+  /**
+   * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset.
+   * Also advances the position of the MBB by the given length.
+   * @param dst
+   * @param offset within the current array
+   * @param length upto which the bytes to be copied
+   */
+  @Override
+  public void get(byte[] dst, int offset, int length) {
+    while (length > 0) {
+      int toRead = Math.min(length, this.curItem.remaining());
+      ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
+          toRead);
+      this.curItem.position(this.curItem.position() + toRead);
+      length -= toRead;
+      if (length == 0)
+        break;
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+      offset += toRead;
+    }
+  }
+
+  /**
+   * Marks the limit of this MBB.
+   * @param limit
+   * @return This MBB
+   */
+  @Override
+  public MultiByteBuff limit(int limit) {
+    this.limit = limit;
+    // Normally the limit will try to limit within the last BB item
+    int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
+    if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) {
+      this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin);
+      return this;
+    }
+    int itemIndex = getItemIndex(limit);
+    int beginOffset = this.itemBeginPos[itemIndex];
+    int offsetInItem = limit - beginOffset;
+    ByteBuffer item = items[itemIndex];
+    item.limit(offsetInItem);
+    for (int i = this.limitedItemIndex; i < itemIndex; i++) {
+      this.items[i].limit(this.items[i].capacity());
+    }
+    this.limitedItemIndex = itemIndex;
+    for (int i = itemIndex + 1; i < this.items.length; i++) {
+      this.items[i].limit(this.items[i].position());
+    }
+    return this;
+  }
+
+  /**
+   * Returns the limit of this MBB
+   * @return limit of the MBB
+   */
+  @Override
+  public int limit() {
+    return this.limit;
+  }
+
+  /**
+   * Returns an MBB which is a sliced version of this MBB. The position, limit and mark
+   * of the new MBB will be independent than that of the original MBB.
+   * The content of the new MBB will start at this MBB's current position
+   * @return a sliced MBB
+   */
+  @Override
+  public MultiByteBuff slice() {
+    ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
+    for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
+      copy[j] = this.items[i].slice();
+    }
+    return new MultiByteBuff(copy);
+  }
+
+  /**
+   * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
+   * of the new MBB will be independent than that of the original MBB.
+   * The content of the new MBB will start at this MBB's current position
+   * The position, limit and mark of the new MBB would be identical to this MBB in terms of
+   * values.
+   * @return a sliced MBB
+   */
+  @Override
+  public MultiByteBuff duplicate() {
+    ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
+    for (int i = 0; i < this.items.length; i++) {
+      itemsCopy[i] = items[i].duplicate();
+    }
+    return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
+        this.curItemIndex, this.markedItemIndex);
+  }
+
+  /**
+   * Writes a byte to this MBB at the current position and increments the position
+   * @param b
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff put(byte b) {
+    if (this.curItem.remaining() == 0) {
+      if (this.curItemIndex == this.items.length - 1) {
+        throw new BufferOverflowException();
+      }
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+    }
+    this.curItem.put(b);
+    return this;
+  }
+
+  /**
+   * Writes a byte to this MBB at the given index
+   * @param index
+   * @param b
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff put(int index, byte b) {
+    int itemIndex = getItemIndex(limit);
+    ByteBuffer item = items[itemIndex];
+    item.put(index - itemBeginPos[itemIndex], b);
+    return this;
+  }
+
+  /**
+   * Copies from a src MBB to this MBB.
+   * @param offset the position in this MBB to which the copy should happen
+   * @param src the src MBB
+   * @param srcOffset the offset in the src MBB from where the elements should be read
+   * @param length the length upto which the copy should happen
+   */
+  @Override
+  public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+    int destItemIndex = getItemIndex(offset);
+    int srcItemIndex = getItemIndex(srcOffset);
+    ByteBuffer destItem = this.items[destItemIndex];
+    offset = offset - this.itemBeginPos[destItemIndex];
+
+    ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex);
+    srcOffset = srcOffset - this.itemBeginPos[srcItemIndex];
+    int toRead, toWrite, toMove;
+    while (length > 0) {
+      toWrite = destItem.limit() - offset;
+      toRead = srcItem.limit() - srcOffset;
+      toMove = Math.min(length, Math.min(toRead, toWrite));
+      ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove);
+      length -= toMove;
+      if (length == 0) break;
+      if (toRead < toWrite) {
+        srcItem = getItemByteBuffer(src, ++srcItemIndex);
+        srcOffset = 0;
+        offset += toMove;
+      } else if (toRead > toWrite) {
+        destItem = this.items[++destItemIndex];
+        offset = 0;
+        srcOffset += toMove;
+      } else {
+        // toRead = toWrite case
+        srcItem = getItemByteBuffer(src, ++srcItemIndex);
+        srcOffset = 0;
+        destItem = this.items[++destItemIndex];
+        offset = 0;
+      }
+    }
+    return this;
+  }
+
+  private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
+    return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
+        : ((MultiByteBuff) buf).items[index];
+  }
+
+  /**
+   * Writes an int to this MBB at its current position. Also advances the position by size of int
+   * @param val Int value to write
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff putInt(int val) {
+    if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
+      this.curItem.putInt(val);
+      return this;
+    }
+    if (this.curItemIndex == this.items.length - 1) {
+      throw new BufferOverflowException();
+    }
+    // During read, we will read as byte by byte for this case. So just write in Big endian
+    put(int3(val));
+    put(int2(val));
+    put(int1(val));
+    put(int0(val));
+    return this;
+  }
+
+  private static byte int3(int x) {
+    return (byte) (x >> 24);
+  }
+
+  private static byte int2(int x) {
+    return (byte) (x >> 16);
+  }
+
+  private static byte int1(int x) {
+    return (byte) (x >> 8);
+  }
+
+  private static byte int0(int x) {
+    return (byte) (x);
+  }
+
+  /**
+   * Copies from the given byte[] to this MBB
+   * @param src
+   * @return this MBB
+   */
+  @Override
+  public final MultiByteBuff put(byte[] src) {
+    return put(src, 0, src.length);
+  }
+
+  /**
+   * Copies from the given byte[] to this MBB
+   * @param src
+   * @param offset the position in the byte array from which the copy should be done
+   * @param length the length upto which the copy should happen
+   * @return this MBB
+   */
+  @Override
+  public MultiByteBuff put(byte[] src, int offset, int length) {
+    if (this.curItem.remaining() >= length) {
+      ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
+      return this;
+    }
+    int end = offset + length;
+    for (int i = offset; i < end; i++) {
+      this.put(src[i]);
+    }
+    return this;
+  }
+
+
+  /**
+   * Writes a long to this MBB at its current position. Also advances the position by size of long
+   * @param val Long value to write
+   * @return this object
+   */
+  @Override
+  public MultiByteBuff putLong(long val) {
+    if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
+      this.curItem.putLong(val);
+      return this;
+    }
+    if (this.curItemIndex == this.items.length - 1) {
+      throw new BufferOverflowException();
+    }
+    // During read, we will read as byte by byte for this case. So just write in Big endian
+    put(long7(val));
+    put(long6(val));
+    put(long5(val));
+    put(long4(val));
+    put(long3(val));
+    put(long2(val));
+    put(long1(val));
+    put(long0(val));
+    return this;
+  }
+
+  private static byte long7(long x) {
+    return (byte) (x >> 56);
+  }
+
+  private static byte long6(long x) {
+    return (byte) (x >> 48);
+  }
+
+  private static byte long5(long x) {
+    return (byte) (x >> 40);
+  }
+
+  private static byte long4(long x) {
+    return (byte) (x >> 32);
+  }
+
+  private static byte long3(long x) {
+    return (byte) (x >> 24);
+  }
+
+  private static byte long2(long x) {
+    return (byte) (x >> 16);
+  }
+
+  private static byte long1(long x) {
+    return (byte) (x >> 8);
+  }
+
+  private static byte long0(long x) {
+    return (byte) (x);
+  }
+
+  /**
+   * Jumps the current position of this MBB by specified length.
+   * @param length
+   */
+  @Override
+  public MultiByteBuff skip(int length) {
+    // Get available bytes from this item and remaining from next
+    int jump = 0;
+    while (true) {
+      jump = this.curItem.remaining();
+      if (jump >= length) {
+        this.curItem.position(this.curItem.position() + length);
+        break;
+      }
+      this.curItem.position(this.curItem.position() + jump);
+      length -= jump;
+      this.curItemIndex++;
+      this.curItem = this.items[this.curItemIndex];
+    }
+    return this;
+  }
+
+  /**
+   * Jumps back the current position of this MBB by specified length.
+   * @param length
+   */
+  @Override
+  public MultiByteBuff moveBack(int length) {
+    while (length != 0) {
+      if (length > curItem.position()) {
+        length -= curItem.position();
+        this.curItem.position(0);
+        this.curItemIndex--;
+        this.curItem = this.items[curItemIndex];
+      } else {
+        this.curItem.position(curItem.position() - length);
+        break;
+      }
+    }
+    return this;
+  }
+
+ /**
+   * Returns bytes from current position till length specified, as a single ByteBuffer. When all
+   * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item
+   * as such will be returned. So users are warned not to change the position or limit of this
+   * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required
+   * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy
+   * the bytes to a newly created ByteBuffer of required size and return that.
+   *
+   * @param length number of bytes required.
+   * @return bytes from current position till length specified, as a single ByteButter.
+   */
+  @Override
+  public ByteBuffer asSubByteBuffer(int length) {
+    if (this.curItem.remaining() >= length) {
+      return this.curItem;
+    }
+    int offset = 0;
+    byte[] dupB = new byte[length];
+    int locCurItemIndex = curItemIndex;
+    ByteBuffer locCurItem = curItem;
+    while (length > 0) {
+      int toRead = Math.min(length, locCurItem.remaining());
+      ByteBufferUtils
+          .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
+      length -= toRead;
+      if (length == 0)
+        break;
+      locCurItemIndex++;
+      locCurItem = this.items[locCurItemIndex];
+      offset += toRead;
+    }
+    return ByteBuffer.wrap(dupB);
+  }
+
+  /**
+   * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these
+   * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as
+   * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are
+   * warned not to change the position or limit of this returned ByteBuffer. When the required bytes
+   * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created
+   * ByteBuffer of required size and return that.
+   *
+   * @param offset the offset in this MBB from where the subBuffer should be created
+   * @param length the length of the subBuffer
+   * @param pair a pair that will have the bytes from the current position till length specified, as
+   *        a single ByteBuffer and offset in that Buffer where the bytes starts. The method would
+   *        set the values on the pair that is passed in by the caller
+   */
+  @Override
+  public void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair) {
+    if (this.itemBeginPos[this.curItemIndex] <= offset) {
+      int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
+      if (this.curItem.limit() - relOffsetInCurItem >= length) {
+        pair.setFirst(this.curItem);
+        pair.setSecond(relOffsetInCurItem);
+        return;
+      }
+    }
+    int itemIndex = getItemIndex(offset);
+    ByteBuffer item = this.items[itemIndex];
+    offset = offset - this.itemBeginPos[itemIndex];
+    if (item.limit() - offset >= length) {
+      pair.setFirst(item);
+      pair.setSecond(offset);
+      return;
+    }
+    byte[] dst = new byte[length];
+    int destOffset = 0;
+    while (length > 0) {
+      int toRead = Math.min(length, item.limit() - offset);
+      ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead);
+      length -= toRead;
+      if (length == 0) break;
+      itemIndex++;
+      item = this.items[itemIndex];
+      destOffset += toRead;
+      offset = 0;
+    }
+    pair.setFirst(ByteBuffer.wrap(dst));
+    pair.setSecond(0);
+    return;
+  }
+
+  /**
+   * Copies the content from an this MBB to a ByteBuffer
+   * @param out the ByteBuffer to which the copy has to happen
+   * @param sourceOffset the offset in the MBB from which the elements has
+   * to be copied
+   * @param length the length in the MBB upto which the elements has to be copied
+   */
+  @Override
+  public void get(ByteBuffer out, int sourceOffset,
+      int length) {
+      // Not used from real read path actually. So not going with
+      // optimization
+    for (int i = 0; i < length; ++i) {
+      out.put(this.get(sourceOffset + i));
+    }
+  }
+
+  /**
+   * Copy the content from this MBB to a byte[] based on the given offset and
+   * length
+   *
+   * @param offset
+   *          the position from where the copy should start
+   * @param length
+   *          the length upto which the copy has to be done
+   * @return byte[] with the copied contents from this MBB.
+   */
+  @Override
+  public byte[] toBytes(int offset, int length) {
+    byte[] output = new byte[length];
+    int itemIndex = getItemIndex(offset);
+    ByteBuffer item = this.items[itemIndex];
+    int toRead = item.limit() - offset;
+    int destinationOffset = 0;
+    while (length > 0) {
+      toRead = Math.min(length, toRead);
+      ByteBufferUtils.copyFromBufferToArray(output, item, offset, destinationOffset, toRead);
+      length -= toRead;
+      if (length == 0)
+        break;
+      destinationOffset += toRead;
+      offset = 0;
+      item = items[++itemIndex];
+      toRead = item.remaining();
+    }
+    return output;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof MultiByteBuff)) return false;
+    if (this == obj) return true;
+    MultiByteBuff that = (MultiByteBuff) obj;
+    if (this.capacity() != that.capacity()) return false;
+    if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
+        that.limit()) == 0) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 0;
+    for (ByteBuffer b : this.items) {
+      hash += b.hashCode();
+    }
+    return hash;
+  }
+}


[2/4] hbase git commit: HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
index d903d79..29f4811 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -54,7 +55,8 @@ import org.apache.hadoop.io.WritableUtils;
  * PrefixTreeDataBlockEncoder implementation of DataBlockEncoder. This is the primary entry point
  * for PrefixTree encoding and decoding. Encoding is delegated to instances of
  * {@link PrefixTreeEncoder}, and decoding is delegated to instances of
- * {@link org.apache.hadoop.hbase.codec.prefixtree.scanner.CellSearcher}. Encoder and decoder instances are
+ * {@link org.apache.hadoop.hbase.codec.prefixtree.scanner.CellSearcher}.
+ * Encoder and decoder instances are
  * created and recycled by static PtEncoderFactory and PtDecoderFactory.
  */
 @InterfaceAudience.Private
@@ -114,12 +116,14 @@ public class PrefixTreeCodec implements DataBlockEncoder {
 
 
   @Override
-  public Cell getFirstKeyCellInBlock(ByteBuffer block) {
+  public Cell getFirstKeyCellInBlock(ByteBuff block) {
     block.rewind();
     PrefixTreeArraySearcher searcher = null;
     try {
       // should i includeMemstoreTS (second argument)?  i think PrefixKeyDeltaEncoder is, so i will
-      searcher = DecoderFactory.checkOut(block, true);
+      // TODO : Change to work with BBs
+      searcher = DecoderFactory.checkOut(block.asSubByteBuffer(block.limit() - block.position()),
+          true);
       if (!searcher.positionAtFirstCell()) {
         return null;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
index eefd953..93610c4 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
@@ -57,6 +57,7 @@ public class PrefixTreeSeeker implements EncodedSeeker {
   @Override
   public void setCurrentBuffer(ByteBuffer fullBlockBuffer) {
     block = fullBlockBuffer;
+    // TODO : change to Bytebuff
     ptSearcher = DecoderFactory.checkOut(block, includeMvccVersion);
     rewind();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
index f56a921..26555ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
@@ -18,9 +18,9 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  * Interface for a deserializer. Throws an IOException if the serialized data is
@@ -33,7 +33,7 @@ public interface CacheableDeserializer<T extends Cacheable> {
    *
    * @return T the deserialized object.
    */
-  T deserialize(ByteBuffer b) throws IOException;
+  T deserialize(ByteBuff b) throws IOException;
 
   /**
    * 
@@ -43,7 +43,7 @@ public interface CacheableDeserializer<T extends Cacheable> {
    * @return T the deserialized object.
    * @throws IOException
    */
-  T deserialize(ByteBuffer b, boolean reuse) throws IOException;
+  T deserialize(ByteBuff b, boolean reuse) throws IOException;
 
   /**
    * Get the identifier of this deserialiser. Identifier is unique for each

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
index 11436ce..6890884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -93,7 +93,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
   }
 
   @Override
-  public boolean contains(byte[] key, int keyOffset, int keyLength, ByteBuffer bloom) {
+  public boolean contains(byte[] key, int keyOffset, int keyLength, ByteBuff bloom) {
     // We try to store the result in this variable so we can update stats for
     // testing, but when an error happens, we log a message and return.
 
@@ -120,7 +120,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
                 + Bytes.toStringBinary(key, keyOffset, keyLength), ex);
       }
 
-      ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly();
+      ByteBuff bloomBuf = bloomBlock.getBufferReadOnly();
       result = BloomFilterUtil.contains(key, keyOffset, keyLength,
           bloomBuf, bloomBlock.headerSize(),
           bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount);
@@ -137,7 +137,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
   }
 
   @Override
-  public boolean contains(Cell keyCell, ByteBuffer bloom) {
+  public boolean contains(Cell keyCell, ByteBuff bloom) {
     // We try to store the result in this variable so we can update stats for
     // testing, but when an error happens, we log a message and return.
     int block = index.rootBlockContainingKey(keyCell);

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 6653c23..71ac506 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -26,7 +26,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.SequenceInputStream;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -55,6 +54,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -389,7 +389,7 @@ public class HFile {
 
     HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction);
 
-    ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
+    ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
 
     Map<byte[], byte[]> loadFileInfo() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index f3bf0b7..8672d62 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -34,14 +34,16 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -104,8 +106,9 @@ public class HFileBlock implements Cacheable {
   static final byte[] DUMMY_HEADER_NO_CHECKSUM =
      new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
 
-  public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
-      ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
+  // How to get the estimate correctly? if it is a singleBB?
+  public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
+      new MultiByteBuff(ByteBuffer.wrap(new byte[0], 0, 0)).getClass(), false);
 
   // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
   public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
@@ -118,14 +121,16 @@ public class HFileBlock implements Cacheable {
 
   static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
-        public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
+        public HFileBlock deserialize(ByteBuff buf, boolean reuse) throws IOException{
           buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
-          ByteBuffer newByteBuffer;
+          ByteBuff newByteBuffer;
           if (reuse) {
             newByteBuffer = buf.slice();
           } else {
-           newByteBuffer = ByteBuffer.allocate(buf.limit());
-           newByteBuffer.put(buf);
+            // Used only in tests
+            int len = buf.limit();
+            newByteBuffer = new SingleByteBuff(ByteBuffer.allocate(len));
+            newByteBuffer.put(0, buf, buf.position(), len);
           }
           buf.position(buf.limit());
           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
@@ -145,7 +150,8 @@ public class HFileBlock implements Cacheable {
         }
 
         @Override
-        public HFileBlock deserialize(ByteBuffer b) throws IOException {
+        public HFileBlock deserialize(ByteBuff b) throws IOException {
+          // Used only in tests
           return deserialize(b, false);
         }
       };
@@ -174,7 +180,7 @@ public class HFileBlock implements Cacheable {
   private final int onDiskDataSizeWithHeader;
 
   /** The in-memory representation of the hfile block */
-  private ByteBuffer buf;
+  private ByteBuff buf;
 
   /** Meta data that holds meta information on the hfileblock */
   private HFileContext fileContext;
@@ -209,7 +215,7 @@ public class HFileBlock implements Cacheable {
    * @param fileContext HFile meta data
    */
   HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
-      long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
+      long prevBlockOffset, ByteBuff buf, boolean fillHeader, long offset,
       int onDiskDataSizeWithHeader, HFileContext fileContext) {
     this.blockType = blockType;
     this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
@@ -224,6 +230,13 @@ public class HFileBlock implements Cacheable {
     this.buf.rewind();
   }
 
+  HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader,
+      long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
+      int onDiskDataSizeWithHeader, HFileContext fileContext) {
+    this(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
+        new SingleByteBuff(buf), fillHeader, offset, onDiskDataSizeWithHeader, fileContext);
+  }
+
   /**
    * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
    */
@@ -239,6 +252,9 @@ public class HFileBlock implements Cacheable {
     this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
   }
 
+  HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
+    this(new SingleByteBuff(b), usesHBaseChecksum);
+  }
   /**
    * Creates a block from an existing buffer starting with a header. Rewinds
    * and takes ownership of the buffer. By definition of rewind, ignores the
@@ -247,7 +263,7 @@ public class HFileBlock implements Cacheable {
    * because majorNumbers indicate the format of a HFile whereas minorNumbers
    * indicate the format inside a HFileBlock.
    */
-  HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException {
+  HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException {
     b.rewind();
     blockType = BlockType.read(b);
     onDiskSizeWithoutHeader = b.getInt();
@@ -334,8 +350,8 @@ public class HFileBlock implements Cacheable {
    *
    * @return the buffer with header skipped and checksum omitted.
    */
-  public ByteBuffer getBufferWithoutHeader() {
-    ByteBuffer dup = this.buf.duplicate();
+  public ByteBuff getBufferWithoutHeader() {
+    ByteBuff dup = this.buf.duplicate();
     dup.position(headerSize());
     dup.limit(buf.limit() - totalChecksumBytes());
     return dup.slice();
@@ -343,15 +359,15 @@ public class HFileBlock implements Cacheable {
 
   /**
    * Returns the buffer this block stores internally. The clients must not
-   * modify the buffer object. This method has to be public because it is
-   * used in {@link CompoundBloomFilter} to avoid object
-   *  creation on every Bloom filter lookup, but has to be used with caution.
-   *   Checksum data is not included in the returned buffer but header data is.
+   * modify the buffer object. This method has to be public because it is used
+   * in {@link CompoundBloomFilter} to avoid object creation on every Bloom
+   * filter lookup, but has to be used with caution. Checksum data is not
+   * included in the returned buffer but header data is.
    *
    * @return the buffer of this block for read-only operations
    */
-  public ByteBuffer getBufferReadOnly() {
-    ByteBuffer dup = this.buf.duplicate();
+  public ByteBuff getBufferReadOnly() {
+    ByteBuff dup = this.buf.duplicate();
     dup.limit(buf.limit() - totalChecksumBytes());
     return dup.slice();
   }
@@ -363,8 +379,8 @@ public class HFileBlock implements Cacheable {
    *
    * @return the buffer with header and checksum included for read-only operations
    */
-  public ByteBuffer getBufferReadOnlyWithHeader() {
-    ByteBuffer dup = this.buf.duplicate();
+  public ByteBuff getBufferReadOnlyWithHeader() {
+    ByteBuff dup = this.buf.duplicate();
     return dup.slice();
   }
 
@@ -374,8 +390,8 @@ public class HFileBlock implements Cacheable {
    *
    * @return the byte buffer with header and checksum included
    */
-  ByteBuffer getBufferWithHeader() {
-    ByteBuffer dupBuf = buf.duplicate();
+  ByteBuff getBufferWithHeader() {
+    ByteBuff dupBuf = buf.duplicate();
     dupBuf.rewind();
     return dupBuf;
   }
@@ -417,7 +433,8 @@ public class HFileBlock implements Cacheable {
     sanityCheckAssertion(buf.getLong(), prevBlockOffset, "prevBlocKOffset");
     if (this.fileContext.isUseHBaseChecksum()) {
       sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType");
-      sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
+      sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(),
+          "bytesPerChecksum");
       sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
     }
 
@@ -463,7 +480,7 @@ public class HFileBlock implements Cacheable {
       dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
           Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
     } else {
-      ByteBuffer bufWithoutHeader = getBufferWithoutHeader();
+      ByteBuff bufWithoutHeader = getBufferWithoutHeader();
       byte[] dataBeginBytes = new byte[Math.min(32,
           bufWithoutHeader.limit() - bufWithoutHeader.position())];
       bufWithoutHeader.get(dataBeginBytes);
@@ -489,7 +506,7 @@ public class HFileBlock implements Cacheable {
       if (buf.hasArray()) {
         dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit()));
       } else {
-        ByteBuffer bufDup = getBufferReadOnly();
+        ByteBuff bufDup = getBufferReadOnly();
         byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())];
         bufDup.get(dataBeginBytes);
         dataBegin = Bytes.toStringBinary(dataBeginBytes);
@@ -521,7 +538,7 @@ public class HFileBlock implements Cacheable {
     HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
       reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
 
-    ByteBuffer dup = this.buf.duplicate();
+    ByteBuff dup = this.buf.duplicate();
     dup.position(this.headerSize());
     dup = dup.slice();
     ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
@@ -534,16 +551,14 @@ public class HFileBlock implements Cacheable {
       // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when
       // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create
       // new BB objects
-      ByteBuffer inDup = this.buf.duplicate();
+      ByteBuff inDup = this.buf.duplicate();
       inDup.limit(inDup.limit() + headerSize());
-      ByteBuffer outDup = unpacked.buf.duplicate();
+      ByteBuff outDup = unpacked.buf.duplicate();
       outDup.limit(outDup.limit() + unpacked.headerSize());
-      ByteBufferUtils.copyFromBufferToBuffer(
-          outDup,
-          inDup,
-          this.onDiskDataSizeWithHeader,
+      outDup.put(
           unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader
-              + unpacked.totalChecksumBytes(), unpacked.headerSize());
+              + unpacked.totalChecksumBytes(), inDup, this.onDiskDataSizeWithHeader,
+          unpacked.headerSize());
     }
     return unpacked;
   }
@@ -571,11 +586,10 @@ public class HFileBlock implements Cacheable {
 
     // Copy header bytes into newBuf.
     // newBuf is HBB so no issue in calling array()
-    ByteBuffer dup = buf.duplicate();
-    dup.position(0);
-    dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
+    buf.position(0);
+    buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize);
 
-    buf = newBuf;
+    buf = new SingleByteBuff(newBuf);
     // set limit to exclude next block's header
     buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
   }
@@ -627,16 +641,16 @@ public class HFileBlock implements Cacheable {
    * @return a byte stream reading the data + checksum of this block
    */
   public DataInputStream getByteStream() {
-    ByteBuffer dup = this.buf.duplicate();
+    ByteBuff dup = this.buf.duplicate();
     dup.position(this.headerSize());
-    return new DataInputStream(new ByteBufferInputStream(dup));
+    return new DataInputStream(new ByteBuffInputStream(dup));
   }
 
   @Override
   public long heapSize() {
     long size = ClassSize.align(
         ClassSize.OBJECT +
-        // Block type, byte buffer and meta references
+        // Block type, multi byte buffer and meta references
         3 * ClassSize.REFERENCE +
         // On-disk size, uncompressed size, and next block's on-disk size
         // bytePerChecksum and onDiskDataSize
@@ -649,7 +663,7 @@ public class HFileBlock implements Cacheable {
 
     if (buf != null) {
       // Deep overhead of the byte buffer. Needs to be aligned separately.
-      size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE);
+      size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE);
     }
 
     return ClassSize.align(size);
@@ -1724,7 +1738,7 @@ public class HFileBlock implements Cacheable {
 
   @Override
   public void serialize(ByteBuffer destination) {
-    ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength()
+    this.buf.get(destination, 0, getSerializedLength()
         - EXTRA_SERIALIZATION_SPACE);
     serializeExtraInfo(destination);
   }
@@ -1786,7 +1800,7 @@ public class HFileBlock implements Cacheable {
     if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
       return false;
     }
-    if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
+    if (ByteBuff.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0,
         castedComparison.buf.limit()) != 0) {
       return false;
     }
@@ -1876,7 +1890,7 @@ public class HFileBlock implements Cacheable {
    * This is mostly helpful for debugging. This assumes that the block
    * has minor version > 0.
    */
-  static String toStringHeader(ByteBuffer buf) throws IOException {
+  static String toStringHeader(ByteBuff buf) throws IOException {
     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)];
     buf.get(magicBuf);
     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 86b5e15..85190d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -45,9 +45,10 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -342,7 +343,7 @@ public class HFileBlockIndex {
 
         // Locate the entry corresponding to the given key in the non-root
         // (leaf or intermediate-level) index block.
-        ByteBuffer buffer = block.getBufferWithoutHeader();
+        ByteBuff buffer = block.getBufferWithoutHeader();
         index = locateNonRootIndexEntry(buffer, key, comparator);
         if (index == -1) {
           // This has to be changed
@@ -396,14 +397,14 @@ public class HFileBlockIndex {
             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
             BlockType.LEAF_INDEX, null);
 
-        ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
+        ByteBuff b = midLeafBlock.getBufferWithoutHeader();
         int numDataBlocks = b.getInt();
-        int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
-        int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
+        int keyRelOffset = b.getIntStrictlyForward(Bytes.SIZEOF_INT * (midKeyEntry + 1));
+        int keyLen = b.getIntStrictlyForward(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
             keyRelOffset;
         int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
             + SECONDARY_INDEX_ENTRY_OVERHEAD;
-        byte[] bytes = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
+        byte[] bytes = b.toBytes(keyOffset, keyLen);
         targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
       } else {
         // The middle of the root-level index.
@@ -653,7 +654,7 @@ public class HFileBlockIndex {
      * @param i the ith position
      * @return The indexed key at the ith position in the nonRootIndex.
      */
-    protected byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
+    protected byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
       int numEntries = nonRootIndex.getInt(0);
       if (i < 0 || i >= numEntries) {
         return null;
@@ -678,7 +679,7 @@ public class HFileBlockIndex {
         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
 
       // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
-      return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
+      return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
     }
 
     /**
@@ -697,10 +698,10 @@ public class HFileBlockIndex {
      *         -1 otherwise
      * @throws IOException
      */
-    static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
+    static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
         CellComparator comparator) {
 
-      int numEntries = nonRootIndex.getInt(0);
+      int numEntries = nonRootIndex.getIntStrictlyForward(0);
       int low = 0;
       int high = numEntries - 1;
       int mid = 0;
@@ -713,12 +714,12 @@ public class HFileBlockIndex {
       // keys[numEntries] = Infinity, then we are maintaining an invariant that
       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
+      Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
       while (low <= high) {
         mid = (low + high) >>> 1;
 
         // Midkey's offset relative to the end of secondary index
-        int midKeyRelOffset = nonRootIndex.getInt(
-            Bytes.SIZEOF_INT * (mid + 1));
+      int midKeyRelOffset = nonRootIndex.getIntStrictlyForward(Bytes.SIZEOF_INT * (mid + 1));
 
         // The offset of the middle key in the blockIndex buffer
         int midKeyOffset = entriesOffset       // Skip secondary index
@@ -728,16 +729,17 @@ public class HFileBlockIndex {
         // We subtract the two consecutive secondary index elements, which
         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
         // then need to subtract the overhead of offset and onDiskSize.
-        int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
+        int midLength = nonRootIndex.getIntStrictlyForward(Bytes.SIZEOF_INT * (mid + 2)) -
             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
 
         // we have to compare in this order, because the comparator order
         // has special logic when the 'left side' is a special key.
         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
         // done after HBASE-12224 & HBASE-12282
-        // TODO avaoid array call.
-        nonRootIndexKV.setKey(nonRootIndex.array(),
-            nonRootIndex.arrayOffset() + midKeyOffset, midLength);
+        // TODO avoid array call.
+        nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
+        nonRootIndexKV.setKey(pair.getFirst().array(),
+            pair.getFirst().arrayOffset() + pair.getSecond(), midLength);
         int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexKV);
 
         // key lives above the midpoint
@@ -787,19 +789,20 @@ public class HFileBlockIndex {
      *         return -1 in the case the given key is before the first key.
      *
      */
-    static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
+    static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
         CellComparator comparator) {
       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
 
       if (entryIndex != -1) {
-        int numEntries = nonRootBlock.getInt(0);
+        int numEntries = nonRootBlock.getIntStrictlyForward(0);
 
         // The end of secondary index and the beginning of entries themselves.
         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
 
         // The offset of the entry we are interested in relative to the end of
         // the secondary index.
-        int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
+        int entryRelOffset = nonRootBlock
+            .getIntStrictlyForward(Bytes.SIZEOF_INT * (1 + entryIndex));
 
         nonRootBlock.position(entriesOffset + entryRelOffset);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 13836ae..ae2f6c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -48,11 +47,12 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -436,7 +436,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
   }
 
   protected static class HFileScannerImpl implements HFileScanner {
-    private ByteBuffer blockBuffer;
+    private ByteBuff blockBuffer;
     protected final boolean cacheBlocks;
     protected final boolean pread;
     protected final boolean isCompaction;
@@ -450,6 +450,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     private int currTagsLen;
     private KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue();
     protected HFileBlock block;
+    // A pair for reusing in blockSeek() so that we don't garbage lot of objects
+    final Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>();
 
     /**
      * The next indexed key is to keep track of the indexed key of the next data block.
@@ -510,19 +512,21 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       // inlined and is not too big to compile. We also manage position in ByteBuffer ourselves
       // because it is faster than going via range-checked ByteBuffer methods or going through a
       // byte buffer array a byte at a time.
-      int p = blockBuffer.position() + blockBuffer.arrayOffset();
       // Get a long at a time rather than read two individual ints. In micro-benchmarking, even
       // with the extra bit-fiddling, this is order-of-magnitude faster than getting two ints.
-      long ll = Bytes.toLong(blockBuffer.array(), p);
+      // Trying to imitate what was done - need to profile if this is better or
+      // earlier way is better by doing mark and reset?
+      // But ensure that you read long instead of two ints
+      long ll = blockBuffer.getLongStrictlyForward(blockBuffer.position());
       // Read top half as an int of key length and bottom int as value length
       this.currKeyLen = (int)(ll >> Integer.SIZE);
       this.currValueLen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
       checkKeyValueLen();
       // Move position past the key and value lengths and then beyond the key and value
-      p += (Bytes.SIZEOF_LONG + currKeyLen + currValueLen);
+      int p = blockBuffer.position() +  (Bytes.SIZEOF_LONG + currKeyLen + currValueLen);
       if (reader.getFileContext().isIncludesTags()) {
         // Tags length is a short.
-        this.currTagsLen = Bytes.toShort(blockBuffer.array(), p);
+        this.currTagsLen = blockBuffer.getShortStrictlyForward(p);
         checkTagsLen();
         p += (Bytes.SIZEOF_SHORT + currTagsLen);
       }
@@ -560,14 +564,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       // This is Bytes#bytesToVint inlined so can save a few instructions in this hot method; i.e.
       // previous if one-byte vint, we'd redo the vint call to find int size.
       // Also the method is kept small so can be inlined.
-      byte firstByte = blockBuffer.array()[position];
+      byte firstByte = blockBuffer.getByteStrictlyForward(position);
       int len = WritableUtils.decodeVIntSize(firstByte);
       if (len == 1) {
         this.currMemstoreTS = firstByte;
       } else {
         long i = 0;
         for (int idx = 0; idx < len - 1; idx++) {
-          byte b = blockBuffer.array()[position + 1 + idx];
+          byte b = blockBuffer.get(position + 1 + idx);
           i = i << 8;
           i = i | (b & 0xFF);
         }
@@ -598,13 +602,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
      */
     protected int blockSeek(Cell key, boolean seekBefore) {
       int klen, vlen, tlen = 0;
-      long memstoreTS = 0;
-      int memstoreTSLen = 0;
       int lastKeyValueSize = -1;
+      int pos = -1;
       do {
-        blockBuffer.mark();
-        klen = blockBuffer.getInt();
-        vlen = blockBuffer.getInt();
+        pos = blockBuffer.position();
+        // Better to ensure that we use the BB Utils here
+        long ll = blockBuffer.getLongStrictlyForward(pos);
+        klen = (int)(ll >> Integer.SIZE);
+        vlen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
         if (klen < 0 || vlen < 0 || klen > blockBuffer.limit()
             || vlen > blockBuffer.limit()) {
           throw new IllegalStateException("Invalid klen " + klen + " or vlen "
@@ -612,77 +617,68 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
               + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
               + blockBuffer.position() + " (without header).");
         }
-        ByteBufferUtils.skip(blockBuffer, klen + vlen);
+        pos += Bytes.SIZEOF_LONG;
+        blockBuffer.asSubByteBuffer(pos, klen, pair);
+        // TODO :change here after Bufferbackedcells come
+        keyOnlyKv.setKey(pair.getFirst().array(), pair.getFirst().arrayOffset() + pair.getSecond(),
+            klen);
+        int comp = reader.getComparator().compareKeyIgnoresMvcc(key, keyOnlyKv);
+        pos += klen + vlen;
         if (this.reader.getFileContext().isIncludesTags()) {
           // Read short as unsigned, high byte first
-          tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff);
+          tlen = ((blockBuffer.getByteStrictlyForward(pos) & 0xff) << 8)
+              ^ (blockBuffer.getByteStrictlyForward(pos + 1) & 0xff);
           if (tlen < 0 || tlen > blockBuffer.limit()) {
             throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: "
                 + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: "
                 + blockBuffer.position() + " (without header).");
           }
-          ByteBufferUtils.skip(blockBuffer, tlen);
+          // add the two bytes read for the tags.
+          pos += tlen + (Bytes.SIZEOF_SHORT);
         }
         if (this.reader.shouldIncludeMemstoreTS()) {
-          if (this.reader.isDecodeMemstoreTS()) {
-            memstoreTS = Bytes.readAsVLong(blockBuffer.array(), blockBuffer.arrayOffset()
-                + blockBuffer.position());
-            memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
-          } else {
-            memstoreTS = 0;
-            memstoreTSLen = 1;
-          }
+          // Directly read the mvcc based on current position
+          readMvccVersion(pos);
         }
-        blockBuffer.reset();
-        int keyOffset =
-          blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2);
-        keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen);
-        int comp = reader.getComparator().compareKeyIgnoresMvcc(key, keyOnlyKv);
-
         if (comp == 0) {
           if (seekBefore) {
             if (lastKeyValueSize < 0) {
               throw new IllegalStateException("blockSeek with seekBefore "
-                  + "at the first key of the block: key="
-                  + CellUtil.getCellKeyAsString(key)
+                  + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key)
                   + ", blockOffset=" + block.getOffset() + ", onDiskSize="
                   + block.getOnDiskSizeWithHeader());
             }
-            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+            blockBuffer.moveBack(lastKeyValueSize);
             readKeyValueLen();
             return 1; // non exact match.
           }
           currKeyLen = klen;
           currValueLen = vlen;
           currTagsLen = tlen;
-          if (this.reader.shouldIncludeMemstoreTS()) {
-            currMemstoreTS = memstoreTS;
-            currMemstoreTSLen = memstoreTSLen;
-          }
           return 0; // indicate exact match
         } else if (comp < 0) {
-          if (lastKeyValueSize > 0)
-            blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+          if (lastKeyValueSize > 0) {
+            blockBuffer.moveBack(lastKeyValueSize);
+          }
           readKeyValueLen();
           if (lastKeyValueSize == -1 && blockBuffer.position() == 0) {
             return HConstants.INDEX_KEY_MAGIC;
           }
           return 1;
         }
-
         // The size of this key/value tuple, including key/value length fields.
-        lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
+        lastKeyValueSize = klen + vlen + currMemstoreTSLen + KEY_VALUE_LEN_SIZE;
         // include tag length also if tags included with KV
-        if (this.reader.getFileContext().isIncludesTags()) {
+        if (reader.getFileContext().isIncludesTags()) {
           lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT;
         }
-        blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
-      } while (blockBuffer.remaining() > 0);
+        blockBuffer.skip(lastKeyValueSize);
+      } while (blockBuffer.hasRemaining());
 
       // Seek to the last key we successfully read. This will happen if this is
       // the last key/value pair in the file, in which case the following call
       // to next() has to return false.
-      blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
+      blockBuffer.moveBack(lastKeyValueSize);
       readKeyValueLen();
       return 1; // didn't exactly find it.
     }
@@ -849,6 +845,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     @Override
     public ByteBuffer getValue() {
       assertSeeked();
+      // TODO : change here after BufferBacked cells come
       return ByteBuffer.wrap(
           blockBuffer.array(),
           blockBuffer.arrayOffset() + blockBuffer.position()
@@ -1030,15 +1027,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
     }
 
     protected Cell getFirstKeyCellInBlock(HFileBlock curBlock) {
-      ByteBuffer buffer = curBlock.getBufferWithoutHeader();
+      ByteBuff buffer = curBlock.getBufferWithoutHeader();
       // It is safe to manipulate this buffer because we own the buffer object.
       buffer.rewind();
       int klen = buffer.getInt();
-      buffer.getInt();
-      ByteBuffer keyBuff = buffer.slice();
-      keyBuff.limit(klen);
-      keyBuff.rewind();
-      // Create a KeyOnlyKv now. 
+      buffer.skip(Bytes.SIZEOF_INT);// Skip value len part
+      ByteBuffer keyBuff = buffer.asSubByteBuffer(klen);
+      keyBuff.limit(keyBuff.position() + klen);
+      // Create a KeyOnlyKv now.
       // TODO : Will change when Buffer backed cells come
       return new KeyValue.KeyOnlyKeyValue(keyBuff.array(), keyBuff.arrayOffset()
           + keyBuff.position(), klen);
@@ -1188,7 +1184,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
    * @throws IOException
    */
   @Override
-  public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
+  public ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock)
       throws IOException {
     if (trailer.getMetaIndexCount() == 0) {
       return null; // there are no meta blocks
@@ -1457,22 +1453,22 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
           + " doesn't support data block encoding "
           + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
       }
-
-      seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
+      ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
+      // TODO : Change the DBEs to work with ByteBuffs
+      seeker.setCurrentBuffer(encodedBuffer.asSubByteBuffer(encodedBuffer.limit()));
       blockFetches++;
 
       // Reset the next indexed key
       this.nextIndexedKey = null;
     }
 
-    private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
-      ByteBuffer origBlock = newBlock.getBufferReadOnly();
-      ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
-          origBlock.arrayOffset() + newBlock.headerSize() +
-          DataBlockEncoding.ID_SIZE,
-          newBlock.getUncompressedSizeWithoutHeader() -
-          DataBlockEncoding.ID_SIZE).slice();
-      return encodedBlock;
+    private ByteBuff getEncodedBuffer(HFileBlock newBlock) {
+      ByteBuff origBlock = newBlock.getBufferReadOnly();
+      int pos = newBlock.headerSize() + DataBlockEncoding.ID_SIZE;
+      origBlock.position(pos);
+      origBlock
+          .limit(pos + newBlock.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE);
+      return origBlock.slice();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
index 57e7f28..f12f3b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
@@ -29,10 +29,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -255,7 +258,7 @@ public class MemcachedBlockCache implements BlockCache {
     @Override
     public HFileBlock decode(CachedData d) {
       try {
-        ByteBuffer buf = ByteBuffer.wrap(d.getData());
+        ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
         return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
       } catch (IOException e) {
         LOG.warn("Error deserializing data from memcached",e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index dfada87..f05a255 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ConcurrentIndex;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -416,9 +418,12 @@ public class BucketCache implements BlockCache, HeapSize {
         // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
         // existence here.
         if (bucketEntry.equals(backingMap.get(key))) {
+          // TODO : change this area - should be removed after server cells and
+          // 12295 are available
           int len = bucketEntry.getLength();
-          ByteBuffer bb = ByteBuffer.allocate(len);
-          int lenRead = ioEngine.read(bb, bucketEntry.offset());
+          ByteBuffer buf = ByteBuffer.allocate(len);
+          int lenRead = ioEngine.read(buf, bucketEntry.offset());
+          ByteBuff bb = new SingleByteBuff(buf);
           if (lenRead != len) {
             throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
           }
@@ -1269,7 +1274,7 @@ public class BucketCache implements BlockCache, HeapSize {
       try {
         if (data instanceof HFileBlock) {
           HFileBlock block = (HFileBlock) data;
-          ByteBuffer sliceBuf = block.getBufferReadOnlyWithHeader();
+          ByteBuff sliceBuf = block.getBufferReadOnlyWithHeader();
           sliceBuf.rewind();
           assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
             len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index de10667..03c65de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferArray;
 
 /**
@@ -78,6 +79,11 @@ public class ByteBufferIOEngine implements IOEngine {
         dstBuffer.arrayOffset());
   }
 
+  @Override
+  public ByteBuff read(long offset, int len) throws IOException {
+    return bufferArray.asSubByteBuff(offset, len);
+  }
+
   /**
    * Transfers data from the given byte buffer to the buffer array
    * @param srcBuffer the given byte buffer from which bytes are to be read
@@ -92,6 +98,14 @@ public class ByteBufferIOEngine implements IOEngine {
         srcBuffer.arrayOffset());
   }
 
+  @Override
+  public void write(ByteBuff srcBuffer, long offset) throws IOException {
+    // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
+    // This will work for now. But from the DFS itself if we get DBB then this may not hold true.
+    assert srcBuffer.hasArray();
+    bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
+        srcBuffer.arrayOffset());
+  }
   /**
    * No operation for the sync in the memory IO engine
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 7b6b25f..b1960c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -26,6 +26,8 @@ import java.nio.channels.FileChannel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -125,4 +127,20 @@ public class FileIOEngine implements IOEngine {
       LOG.error("Can't shutdown cleanly", ex);
     }
   }
+
+  @Override
+  public ByteBuff read(long offset, int len) throws IOException {
+    ByteBuffer dstBuffer = ByteBuffer.allocate(len);
+    int read = read(dstBuffer, offset);
+    dstBuffer.limit(read);
+    return new SingleByteBuff(dstBuffer);
+  }
+
+  @Override
+  public void write(ByteBuff srcBuffer, long offset) throws IOException {
+    // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
+    assert srcBuffer.hasArray();
+    fileChannel.write(
+        ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
index 430c5af..862042f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  * A class implementing IOEngine interface supports data services for
@@ -40,10 +41,22 @@ public interface IOEngine {
    * @param offset The offset in the IO engine where the first byte to be read
    * @return number of bytes read
    * @throws IOException
+   * @throws RuntimeException when the length of the ByteBuff read is less than 'len'
    */
   int read(ByteBuffer dstBuffer, long offset) throws IOException;
 
   /**
+   * Transfers data from IOEngine at the given offset to an MultiByteBuffer
+   * @param offset the offset from which the underlying buckets should be read
+   * @param len the length upto which the buckets should be read
+   * @return the MultiByteBuffer formed from the underlying ByteBuffers forming the
+   * buckets
+   * @throws IOException
+   * @throws RuntimeException when the length of the ByteBuff read is less than 'len'
+   */
+  ByteBuff read(long offset, int len) throws IOException;
+
+  /**
    * Transfers data from the given byte buffer to IOEngine
    * @param srcBuffer the given byte buffer from which bytes are to be read
    * @param offset The offset in the IO engine where the first byte to be
@@ -53,6 +66,14 @@ public interface IOEngine {
   void write(ByteBuffer srcBuffer, long offset) throws IOException;
 
   /**
+   * Transfers the data from the given MultiByteBuffer to IOEngine
+   * @param srcBuffer the given MultiBytebufffers from which bytes are to be read
+   * @param offset the offset in the IO engine where the first byte to be written
+   * @throws IOException
+   */
+  void write(ByteBuff srcBuffer, long offset) throws IOException;
+
+  /**
    * Sync the data to IOEngine after writing
    * @throws IOException
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 50da338..9b5e222 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -38,8 +38,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 891a59d..eb76440 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.DataInput;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,6 +50,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -1285,7 +1285,7 @@ public class StoreFile {
 
       try {
         boolean shouldCheckBloom;
-        ByteBuffer bloom;
+        ByteBuff bloom;
         if (bloomFilter.supportsAutoLoading()) {
           bloom = null;
           shouldCheckBloom = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 95e0d8e..37573c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Scan;

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
index 315ed97..197ff12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java
@@ -18,10 +18,10 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  *
@@ -83,7 +83,7 @@ public interface BloomFilter extends BloomFilterBase {
    *        is supported.
    * @return true if matched by bloom, false if not
    */
-  boolean contains(Cell keyCell, ByteBuffer bloom);
+  boolean contains(Cell keyCell, ByteBuff bloom);
 
   /**
    * Check if the specified key is contained in the bloom filter.
@@ -95,7 +95,7 @@ public interface BloomFilter extends BloomFilterBase {
    *        is supported.
    * @return true if matched by bloom, false if not
    */
-  boolean contains(byte[] buf, int offset, int length, ByteBuffer bloom);
+  boolean contains(byte[] buf, int offset, int length, ByteBuff bloom);
 
   /**
    * @return true if this Bloom filter can automatically load its data

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java
index 9fff872..1e77984 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java
@@ -26,8 +26,6 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * The basic building block for the {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilter}
  */
@@ -183,35 +181,6 @@ public class BloomFilterChunk implements BloomFilterBase {
     ++this.keyCount;
   }
 
-  @VisibleForTesting
-  boolean contains(byte [] buf) {
-    return contains(buf, 0, buf.length, this.bloom);
-  }
-
-  @VisibleForTesting
-  boolean contains(byte [] buf, int offset, int length) {
-    return contains(buf, offset, length, bloom);
-  }
-
-  @VisibleForTesting
-  boolean contains(byte[] buf, ByteBuffer bloom) {
-    return contains(buf, 0, buf.length, bloom);
-  }
-
-  public boolean contains(byte[] buf, int offset, int length, ByteBuffer theBloom) {
-    if (theBloom == null) {
-      theBloom = bloom;
-    }
-
-    if (theBloom.limit() != byteSize) {
-      throw new IllegalArgumentException("Bloom does not match expected size:"
-          + " theBloom.limit()=" + theBloom.limit() + ", byteSize=" + byteSize);
-    }
-
-    return BloomFilterUtil.contains(buf, offset, length, theBloom, 0, (int) byteSize, hash,
-        hashCount);
-  }
-
   //---------------------------------------------------------------------------
   /** Private helpers */
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java
index dd90b2b..fd30710 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java
@@ -22,9 +22,10 @@ import java.text.NumberFormat;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
- * Utility methods related to BloomFilters 
+ * Utility methods related to BloomFilters
  */
 @InterfaceAudience.Private
 public final class BloomFilterUtil {
@@ -193,7 +194,7 @@ public final class BloomFilterUtil {
   }
 
   public static boolean contains(byte[] buf, int offset, int length,
-      ByteBuffer bloomBuf, int bloomOffset, int bloomSize, Hash hash,
+      ByteBuff bloomBuf, int bloomOffset, int bloomSize, Hash hash,
       int hashCount) {
 
     int hash1 = hash.hash(buf, offset, length, 0);
@@ -206,7 +207,7 @@ public final class BloomFilterUtil {
       for (int i = 0; i < hashCount; i++) {
         int hashLoc = Math.abs(compositeHash % bloomBitSize);
         compositeHash += hash2;
-        if (!get(hashLoc, bloomBuf, bloomOffset)) {
+        if (!checkBit(hashLoc, bloomBuf, bloomOffset)) {
           return false;
         }
       }
@@ -214,29 +215,28 @@ public final class BloomFilterUtil {
       // Test mode with "fake lookups" to estimate "ideal false positive rate".
       for (int i = 0; i < hashCount; i++) {
         int hashLoc = randomGeneratorForTest.nextInt(bloomBitSize);
-        if (!get(hashLoc, bloomBuf, bloomOffset)){
+        if (!checkBit(hashLoc, bloomBuf, bloomOffset)){
           return false;
         }
       }
     }
     return true;
   }
-  
+
   /**
    * Check if bit at specified index is 1.
    *
    * @param pos index of bit
    * @return true if bit at specified index is 1, false if 0.
    */
-  public static boolean get(int pos, ByteBuffer bloomBuf, int bloomOffset) {
+   static boolean checkBit(int pos, ByteBuff bloomBuf, int bloomOffset) {
     int bytePos = pos >> 3; //pos / 8
     int bitPos = pos & 0x7; //pos % 8
-    // TODO access this via Util API which can do Unsafe access if possible(?)
     byte curByte = bloomBuf.get(bloomOffset + bytePos);
     curByte &= bitvals[bitPos];
     return (curByte != 0);
   }
-  
+
   /**
    * A human-readable string with statistics for the given Bloom filter.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java
index 49594bc..92e432c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/TableLockChecker.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 0dd8bea..5ab7424 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@@ -182,6 +183,7 @@ public class TestFromClientSide {
      final byte[] T3 = Bytes.toBytes("T3");
      HColumnDescriptor hcd = new HColumnDescriptor(FAMILY)
          .setKeepDeletedCells(KeepDeletedCells.TRUE)
+         .setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE)
          .setMaxVersions(3);
 
      HTableDescriptor desc = new HTableDescriptor(TABLENAME);

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
index 48cc5b9..f4160db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -303,7 +304,8 @@ public class TestDataBlockEncoders {
       DataBlockEncoder encoder = encoding.getEncoder();
       ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
           getEncodingContext(Compression.Algorithm.NONE, encoding));
-      Cell key = encoder.getFirstKeyCellInBlock(encodedBuffer);
+      Cell key = encoder.getFirstKeyCellInBlock(new MultiByteBuff(
+          encodedBuffer));
       KeyValue keyBuffer = null;
       if(encoding == DataBlockEncoding.PREFIX_TREE) {
         // This is not an actual case. So the Prefix tree block is not loaded in case of Prefix_tree

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index b0a2ba2..122b7fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -39,6 +39,9 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ChecksumType;
 
 public class CacheTestUtils {
@@ -123,7 +126,7 @@ public class CacheTestUtils {
   public static void testCacheSimple(BlockCache toBeTested, int blockSize,
       int numBlocks) throws Exception {
 
-    HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
+    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks);
     // Confirm empty
     for (HFileBlockPair block : blocks) {
       assertNull(toBeTested.getBlock(block.blockName, true, false, true));
@@ -253,7 +256,7 @@ public class CacheTestUtils {
       new CacheableDeserializer<Cacheable>() {
 
       @Override
-      public Cacheable deserialize(ByteBuffer b) throws IOException {
+      public Cacheable deserialize(ByteBuff b) throws IOException {
         int len = b.getInt();
         Thread.yield();
         byte buf[] = new byte[len];
@@ -267,7 +270,7 @@ public class CacheTestUtils {
       }
 
       @Override
-      public Cacheable deserialize(ByteBuffer b, boolean reuse)
+      public Cacheable deserialize(ByteBuff b, boolean reuse)
           throws IOException {
         return deserialize(b);
       }
@@ -326,8 +329,8 @@ public class CacheTestUtils {
       // declare our data size to be smaller than it by the serialization space
       // required.
 
-      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize
-          - HFileBlock.EXTRA_SERIALIZATION_SPACE);
+      SingleByteBuff cachedBuffer = new SingleByteBuff(ByteBuffer.allocate(blockSize
+          - HFileBlock.EXTRA_SERIALIZATION_SPACE));
       rand.nextBytes(cachedBuffer.array());
       cachedBuffer.rewind();
       int onDiskSizeWithoutHeader = blockSize

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
index ce78a37..110d92b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
 import org.junit.Before;
@@ -71,13 +73,13 @@ public class TestCacheConfig {
     }
 
     @Override
-    public Cacheable deserialize(ByteBuffer b, boolean reuse) throws IOException {
+    public Cacheable deserialize(ByteBuff b, boolean reuse) throws IOException {
       LOG.info("Deserialized " + b + ", reuse=" + reuse);
       return cacheable;
     }
 
     @Override
-    public Cacheable deserialize(ByteBuffer b) throws IOException {
+    public Cacheable deserialize(ByteBuff b) throws IOException {
       LOG.info("Deserialized " + b);
       return cacheable;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index de8d3b9..ca62bf5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.junit.Before;
 import org.junit.Test;
@@ -126,7 +128,7 @@ public class TestChecksum {
       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
           is, totalSize, (HFileSystem) fs, path, meta);
       HFileBlock b = hbr.readBlockData(0, -1, -1, false);
-      ByteBuffer data = b.getBufferWithoutHeader();
+      ByteBuff data = b.getBufferWithoutHeader();
       for (int i = 0; i < 1000; i++) {
         assertEquals(i, data.getInt());
       }
@@ -194,7 +196,7 @@ public class TestChecksum {
         assertEquals(algo == GZ ? 2173 : 4936, 
                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
         // read data back from the hfile, exclude header and checksum
-        ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
+        ByteBuff bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
         DataInputStream in = new DataInputStream(
                                new ByteArrayInputStream(
                                  bb.array(), bb.arrayOffset(), bb.limit()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index af8a6cc..3258991 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -326,11 +328,14 @@ public class TestHFile extends HBaseTestCase {
 
   private void readNumMetablocks(Reader reader, int n) throws IOException {
     for (int i = 0; i < n; i++) {
-      ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false);
+      ByteBuff actual = reader.getMetaBlock("HFileMeta" + i, false);
       ByteBuffer expected =
         ByteBuffer.wrap(("something to test" + i).getBytes());
-      assertEquals("failed to match metadata",
-        Bytes.toStringBinary(expected), Bytes.toStringBinary(actual));
+      assertEquals(
+          "failed to match metadata",
+          Bytes.toStringBinary(expected),
+          Bytes.toStringBinary(actual.array(), actual.arrayOffset() + actual.position(),
+              actual.capacity()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index dfc5569..c6aba43 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -57,6 +56,9 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -437,7 +439,7 @@ public class TestHFileBlock {
               assertTrue("Packed heapSize should be < unpacked heapSize",
                 packedHeapsize < blockUnpacked.heapSize());
             }
-            ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader();
+            ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader();
             if (encoding != DataBlockEncoding.NONE) {
               // We expect a two-byte big-endian encoding id.
               assertEquals(
@@ -454,14 +456,15 @@ public class TestHFileBlock {
             expectedBuffer.rewind();
 
             // test if content matches, produce nice message
-            assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
+            assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding,
+                pread);
 
             // test serialized blocks
             for (boolean reuseBuffer : new boolean[] { false, true }) {
               ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
               blockFromHFile.serialize(serialized);
-              HFileBlock deserialized =
-                (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer);
+              HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer().deserialize(
+                  new SingleByteBuff(serialized), reuseBuffer);
               assertEquals(
                 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
                 blockFromHFile, deserialized);
@@ -483,8 +486,8 @@ public class TestHFileBlock {
     return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
   }
 
-  static void assertBuffersEqual(ByteBuffer expectedBuffer,
-      ByteBuffer actualBuffer, Compression.Algorithm compression,
+  static void assertBuffersEqual(ByteBuff expectedBuffer,
+      ByteBuff actualBuffer, Compression.Algorithm compression,
       DataBlockEncoding encoding, boolean pread) {
     if (!actualBuffer.equals(expectedBuffer)) {
       int prefix = 0;
@@ -506,7 +509,7 @@ public class TestHFileBlock {
    * Convert a few next bytes in the given buffer at the given position to
    * string. Used for error messages.
    */
-  private static String nextBytesToStr(ByteBuffer buf, int pos) {
+  private static String nextBytesToStr(ByteBuff buf, int pos) {
     int maxBytes = buf.limit() - pos;
     int numBytes = Math.min(16, maxBytes);
     return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
@@ -595,7 +598,7 @@ public class TestHFileBlock {
               b = b.unpack(meta, hbr);
               // b's buffer has header + data + checksum while
               // expectedContents have header + data only
-              ByteBuffer bufRead = b.getBufferWithHeader();
+              ByteBuff bufRead = b.getBufferWithHeader();
               ByteBuffer bufExpected = expectedContents.get(i);
               boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
                   bufRead.arrayOffset(),
@@ -617,7 +620,7 @@ public class TestHFileBlock {
                   bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
                 if (detailedLogging) {
                   LOG.warn("expected header" +
-                           HFileBlock.toStringHeader(bufExpected) +
+                           HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) +
                            "\nfound    header" +
                            HFileBlock.toStringHeader(bufRead));
                   LOG.warn("bufread offset " + bufRead.arrayOffset() +
@@ -821,9 +824,9 @@ public class TestHFileBlock {
 
   protected void testBlockHeapSizeInternals() {
     if (ClassSize.is32BitJVM()) {
-      assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64);
+      assertTrue(HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE == 64);
     } else {
-      assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80);
+      assertTrue(HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE == 104);
     }
 
     for (int size : new int[] { 100, 256, 12345 }) {
@@ -839,9 +842,9 @@ public class TestHFileBlock {
       HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
           HFileBlock.FILL_HEADER, -1,
           0, meta);
-      long byteBufferExpectedSize =
-          ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
-              + HConstants.HFILEBLOCK_HEADER_SIZE + size);
+      long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
+          new MultiByteBuff(buf).getClass(), true)
+          + HConstants.HFILEBLOCK_HEADER_SIZE + size);
       long hfileMetaSize =  ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
       long hfileBlockExpectedSize =
           ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
index ebe35b3..9efb5fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
@@ -48,6 +48,9 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -310,7 +313,7 @@ public class TestHFileBlockCompatibility {
 
             assertEquals((int) encodedSizes.get(blockId),
                 b.getUncompressedSizeWithoutHeader());
-            ByteBuffer actualBuffer = b.getBufferWithoutHeader();
+            ByteBuff actualBuffer = b.getBufferWithoutHeader();
             if (encoding != DataBlockEncoding.NONE) {
               // We expect a two-byte big-endian encoding id.
               assertEquals(0, actualBuffer.get(0));
@@ -323,7 +326,7 @@ public class TestHFileBlockCompatibility {
             expectedBuffer.rewind();
 
             // test if content matches, produce nice message
-            TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
+            TestHFileBlock.assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer,
               algo, encoding, pread);
           }
           is.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 279c4ea..bc82aee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -52,6 +52,8 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk;
 import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -407,7 +409,7 @@ public class TestHFileBlockIndex {
       KeyValue.KeyOnlyKeyValue cell = new KeyValue.KeyOnlyKeyValue(
           arrayHoldingKey, searchKey.length / 2, searchKey.length);
       int searchResult = BlockIndexReader.binarySearchNonRootIndex(cell,
-          nonRootIndex, CellComparator.COMPARATOR);
+          new MultiByteBuff(nonRootIndex), CellComparator.COMPARATOR);
       String lookupFailureMsg = "Failed to look up key #" + i + " ("
           + Bytes.toStringBinary(searchKey) + ")";
 
@@ -432,7 +434,7 @@ public class TestHFileBlockIndex {
       // Now test we can get the offset and the on-disk-size using a
       // higher-level API function.s
       boolean locateBlockResult =
-          (BlockIndexReader.locateNonRootIndexEntry(nonRootIndex, cell,
+          (BlockIndexReader.locateNonRootIndexEntry(new MultiByteBuff(nonRootIndex), cell,
           CellComparator.COMPARATOR) != -1);
 
       if (i == 0) {
@@ -605,15 +607,15 @@ public class TestHFileBlockIndex {
       while ((block = iter.nextBlock()) != null) {
         if (block.getBlockType() != BlockType.LEAF_INDEX)
           return;
-        ByteBuffer b = block.getBufferReadOnly();
+        ByteBuff b = block.getBufferReadOnly();
         int n = b.getInt();
         // One int for the number of items, and n + 1 for the secondary index.
         int entriesOffset = Bytes.SIZEOF_INT * (n + 2);
 
         // Get all the keys from the leaf index block. S
         for (int i = 0; i < n; ++i) {
-          int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 1));
-          int nextKeyRelOffset = b.getInt(Bytes.SIZEOF_INT * (i + 2));
+          int keyRelOffset = b.getIntStrictlyForward(Bytes.SIZEOF_INT * (i + 1));
+          int nextKeyRelOffset = b.getIntStrictlyForward(Bytes.SIZEOF_INT * (i + 2));
           int keyLen = nextKeyRelOffset - keyRelOffset;
           int keyOffset = b.arrayOffset() + entriesOffset + keyRelOffset +
               HFileBlockIndex.SECONDARY_INDEX_ENTRY_OVERHEAD;