You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/11/14 17:35:26 UTC

[2/2] hbase git commit: HBASE-15788 Use Offheap ByteBuffers from BufferPool to read RPC requests.

HBASE-15788 Use Offheap ByteBuffers from BufferPool to read RPC requests.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c3685760
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c3685760
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c3685760

Branch: refs/heads/master
Commit: c3685760f004450667920144f926383eb307de53
Parents: 9250bf8
Author: anoopsamjohn <an...@gmail.com>
Authored: Mon Nov 14 23:05:05 2016 +0530
Committer: anoopsamjohn <an...@gmail.com>
Committed: Mon Nov 14 23:05:05 2016 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Put.java     |   5 +-
 .../hadoop/hbase/ipc/CellBlockBuilder.java      |  38 ++-
 .../hadoop/hbase/ipc/TestCellBlockBuilder.java  |   4 +-
 .../apache/hadoop/hbase/OffheapKeyValue.java    |  29 ++-
 .../apache/hadoop/hbase/codec/CellCodec.java    |   8 +-
 .../hadoop/hbase/codec/CellCodecWithTags.java   |   8 +-
 .../org/apache/hadoop/hbase/codec/Codec.java    |   4 +-
 .../hadoop/hbase/codec/KeyValueCodec.java       |  56 ++++-
 .../hbase/codec/KeyValueCodecWithTags.java      |  16 +-
 .../hadoop/hbase/io/ByteArrayOutputStream.java  |   2 +-
 .../hadoop/hbase/io/ByteBufferOutputStream.java |   2 +-
 .../apache/hadoop/hbase/io/ByteBufferPool.java  |   4 +-
 .../io/ByteBufferSupportDataOutputStream.java   |  44 ----
 .../hbase/io/ByteBufferSupportOutputStream.java |  51 ----
 .../hadoop/hbase/io/ByteBufferWriter.java       |  53 ++++
 .../io/ByteBufferWriterDataOutputStream.java    |  44 ++++
 .../hbase/io/ByteBufferWriterOutputStream.java  |  90 +++++++
 .../org/apache/hadoop/hbase/nio/ByteBuff.java   |  45 ++++
 .../apache/hadoop/hbase/nio/MultiByteBuff.java  |  34 +++
 .../apache/hadoop/hbase/nio/SingleByteBuff.java |  12 +-
 .../hadoop/hbase/util/ByteBufferUtils.java      |  10 +-
 .../hbase/io/TestTagCompressionContext.java     |   4 +-
 .../apache/hadoop/hbase/codec/MessageCodec.java |   8 +-
 .../hadoop/hbase/io/hfile/HFileBlock.java       |   4 +-
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |   5 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 239 ++++++++++++++++---
 .../hadoop/hbase/regionserver/HRegion.java      |   4 +-
 .../wal/AsyncProtobufLogWriter.java             |   4 +-
 .../hbase/regionserver/wal/WALCellCodec.java    |  18 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |   4 +-
 .../apache/hadoop/hbase/ipc/TestRpcServer.java  | 140 +++++++++++
 31 files changed, 782 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index dbaf3a7..61a71f7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -319,9 +320,7 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
     byte [] family = CellUtil.cloneFamily(kv);
     List<Cell> list = getCellList(family);
     //Checking that the row of the kv is the same as the put
-    int res = Bytes.compareTo(this.row, 0, row.length,
-        kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
-    if (res != 0) {
+    if (!CellUtil.matchingRow(kv, this.row)) {
       throw new WrongRowIOException("The row in " + kv.toString() +
         " doesn't match the original one " +  Bytes.toStringBinary(this.row));
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index fb2cafa..d00490b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufOutputStream;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
@@ -35,10 +37,13 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -238,15 +243,15 @@ class CellBlockBuilder {
   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
       final byte[] cellBlock) throws IOException {
     // Use this method from Client side to create the CellScanner
-    ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
     if (compressor != null) {
-      cellBlockBuf = decompress(compressor, cellBlockBuf);
+      ByteBuffer cellBlockBuf = decompress(compressor, cellBlock);
+      return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
     }
     // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
     // make Cells directly over the passed BB. This method is called at client side and we don't
     // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
     // of the Cells at user's app level will make it not possible to GC the response byte[]
-    return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
+    return codec.getDecoder(new ByteArrayInputStream(cellBlock));
   }
 
   /**
@@ -258,7 +263,7 @@ class CellBlockBuilder {
    * @throws IOException if cell encoding fails
    */
   public CellScanner createCellScannerReusingBuffers(final Codec codec,
-      final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
+      final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
     // Use this method from HRS to create the CellScanner
     // If compressed, decompress it first before passing it on else we will leak compression
     // resources if the stream is not closed properly after we let it out.
@@ -268,27 +273,38 @@ class CellBlockBuilder {
     return codec.getDecoder(cellBlock);
   }
 
-  private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
+  private ByteBuffer decompress(CompressionCodec compressor, byte[] compressedCellBlock)
       throws IOException {
+    ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock),
+        compressedCellBlock.length * this.cellBlockDecompressionMultiplier);
+    return cellBlock;
+  }
+
+  private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock)
+      throws IOException {
+    ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock),
+        compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+    return new SingleByteBuff(cellBlock);
+  }
+
+  private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
+      int osInitialSize) throws IOException {
     // GZIPCodec fails w/ NPE if no configuration.
     if (compressor instanceof Configurable) {
       ((Configurable) compressor).setConf(this.conf);
     }
     Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
-    CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
-      poolDecompressor);
+    CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
     ByteBufferOutputStream bbos;
     try {
       // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
       // TODO: Reuse buffers.
-      bbos = new ByteBufferOutputStream(
-          cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+      bbos = new ByteBufferOutputStream(osInitialSize);
       IOUtils.copy(cis, bbos);
       bbos.close();
-      cellBlock = bbos.getByteBuffer();
+      return bbos.getByteBuffer();
     } finally {
       CodecPool.returnDecompressor(poolDecompressor);
     }
-    return cellBlock;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
index ccabe66..9addaa5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -78,7 +79,8 @@ public class TestCellBlockBuilder {
     CellScanner cellScanner = sized ? getSizedCellScanner(cells)
         : CellUtil.createCellScanner(Arrays.asList(cells).iterator());
     ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner);
-    cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, bb);
+    cellScanner = builder.createCellScannerReusingBuffers(codec, compressor,
+        new SingleByteBuff(bb));
     int i = 0;
     while (cellScanner.advance()) {
       i++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
index 2165362..06a0ed6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
@@ -36,10 +36,10 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
   protected final ByteBuffer buf;
   protected final int offset;
   protected final int length;
+  protected final boolean hasTags;
   private final short rowLen;
   private final int keyLen;
   private long seqId = 0;
-  private final boolean hasTags;
   // TODO : See if famLen can be cached or not?
 
   private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
@@ -57,6 +57,18 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
     this.seqId = seqId;
   }
 
+  public OffheapKeyValue(ByteBuffer buf, int offset, int length) {
+    assert buf.isDirect();
+    this.buf = buf;
+    this.offset = offset;
+    this.length = length;
+    rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET);
+    keyLen = ByteBufferUtils.toInt(this.buf, this.offset);
+    int tagsLen = this.length
+        - (this.keyLen + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
+    this.hasTags = tagsLen > 0;
+  }
+
   @Override
   public byte[] getRowArray() {
     return CellUtil.cloneRow(this);
@@ -265,16 +277,19 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
 
   @Override
   public void setTimestamp(long ts) throws IOException {
-    // This Cell implementation is not yet used in write path.
-    // TODO when doing HBASE-15179
-    throw new UnsupportedOperationException();
+    ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), Bytes.toBytes(ts), 0,
+        Bytes.SIZEOF_LONG);
+  }
+
+  private int getTimestampOffset() {
+    return this.offset + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + this.keyLen
+        - KeyValue.TIMESTAMP_TYPE_SIZE;
   }
 
   @Override
   public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
-    // This Cell implementation is not yet used in write path.
-    // TODO when doing HBASE-15179
-    throw new UnsupportedOperationException();
+    ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), ts, tsOffset,
+        Bytes.SIZEOF_LONG);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
index d6b64f6..ca2e3e8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
@@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -118,8 +118,8 @@ public class CellCodec implements Codec {
   }
 
   @Override
-  public Decoder getDecoder(ByteBuffer buf) {
-    return getDecoder(new ByteBufferInputStream(buf));
+  public Decoder getDecoder(ByteBuff buf) {
+    return getDecoder(new ByteBuffInputStream(buf));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
index 7326884..2dca10a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
@@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -119,8 +119,8 @@ public class CellCodecWithTags implements Codec {
   }
 
   @Override
-  public Decoder getDecoder(ByteBuffer buf) {
-    return getDecoder(new ByteBufferInputStream(buf));
+  public Decoder getDecoder(ByteBuff buf) {
+    return getDecoder(new ByteBuffInputStream(buf));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
index c8a4cdc..d1463ee 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.codec;
 
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.CellOutputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 
 /**
  * Encoder/Decoder for Cell.
@@ -51,6 +51,6 @@ public interface Codec {
   interface Decoder extends CellScanner {};
 
   Decoder getDecoder(InputStream is);
-  Decoder getDecoder(ByteBuffer buf);
+  Decoder getDecoder(ByteBuff buf);
   Encoder getEncoder(OutputStream os);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
index 2609398..00ce023 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
@@ -27,8 +27,10 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NoTagsKeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
 import org.apache.hadoop.hbase.ShareableMemory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -76,24 +78,28 @@ public class KeyValueCodec implements Codec {
     }
   }
 
-  public static class ByteBufferedKeyValueDecoder implements Codec.Decoder {
+  public static class ByteBuffKeyValueDecoder implements Codec.Decoder {
 
-    protected final ByteBuffer buf;
+    protected final ByteBuff buf;
     protected Cell current = null;
 
-    public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
+    public ByteBuffKeyValueDecoder(ByteBuff buf) {
       this.buf = buf;
     }
 
     @Override
     public boolean advance() throws IOException {
-      if (this.buf.remaining() <= 0) {
+      if (!this.buf.hasRemaining()) {
         return false;
       }
-      int len = ByteBufferUtils.toInt(buf);
-      assert buf.hasArray();
-      this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len);
-      buf.position(buf.position() + len);
+      int len = buf.getInt();
+      ByteBuffer bb = buf.asSubByteBuffer(len);
+      if (bb.isDirect()) {
+        this.current = createCell(bb, bb.position(), len);
+      } else {
+        this.current = createCell(bb.array(), bb.arrayOffset() + bb.position(), len);
+      }
+      buf.skip(len);
       return true;
     }
 
@@ -106,6 +112,11 @@ public class KeyValueCodec implements Codec {
       return new ShareableMemoryNoTagsKeyValue(buf, offset, len);
     }
 
+    protected Cell createCell(ByteBuffer bb, int pos, int len) {
+      // We know there is not going to be any tags.
+      return new ShareableMemoryOffheapKeyValue(bb, pos, len, false, 0);
+    }
+
     static class ShareableMemoryKeyValue extends KeyValue implements ShareableMemory {
       public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) {
         super(bytes, offset, length);
@@ -133,6 +144,31 @@ public class KeyValueCodec implements Codec {
         return kv;
       }
     }
+
+    static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements ShareableMemory {
+      public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length) {
+        super(buf, offset, length);
+      }
+
+      public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags,
+          long seqId) {
+        super(buf, offset, length, hasTags, seqId);
+      }
+
+      @Override
+      public Cell cloneToCell() {
+        byte[] copy = new byte[this.length];
+        ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length);
+        KeyValue kv;
+        if (this.hasTags) {
+          kv = new KeyValue(copy, 0, copy.length);
+        } else {
+          kv = new NoTagsKeyValue(copy, 0, copy.length);
+        }
+        kv.setSequenceId(this.getSequenceId());
+        return kv;
+      }
+    }
   }
 
   /**
@@ -144,8 +180,8 @@ public class KeyValueCodec implements Codec {
   }
 
   @Override
-  public Decoder getDecoder(ByteBuffer buf) {
-    return new ByteBufferedKeyValueDecoder(buf);
+  public Decoder getDecoder(ByteBuff buf) {
+    return new ByteBuffKeyValueDecoder(buf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
index 63c02e8..84c4840 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 
 /**
@@ -78,16 +79,21 @@ public class KeyValueCodecWithTags implements Codec {
     }
   }
 
-  public static class ByteBufferedKeyValueDecoder
-      extends KeyValueCodec.ByteBufferedKeyValueDecoder {
+  public static class ByteBuffKeyValueDecoder extends KeyValueCodec.ByteBuffKeyValueDecoder {
 
-    public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
+    public ByteBuffKeyValueDecoder(ByteBuff buf) {
       super(buf);
     }
 
+    @Override
     protected Cell createCell(byte[] buf, int offset, int len) {
       return new ShareableMemoryKeyValue(buf, offset, len);
     }
+
+    @Override
+    protected Cell createCell(ByteBuffer bb, int pos, int len) {
+      return new ShareableMemoryOffheapKeyValue(bb, pos, len);
+    }
   }
 
   /**
@@ -104,7 +110,7 @@ public class KeyValueCodecWithTags implements Codec {
   }
 
   @Override
-  public Decoder getDecoder(ByteBuffer buf) {
-    return new ByteBufferedKeyValueDecoder(buf);
+  public Decoder getDecoder(ByteBuff buf) {
+    return new ByteBuffKeyValueDecoder(buf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
index 93121df..22eb156 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  * supports writing ByteBuffer directly to it.
  */
 @InterfaceAudience.Private
-public class ByteArrayOutputStream extends OutputStream implements ByteBufferSupportOutputStream {
+public class ByteArrayOutputStream extends OutputStream implements ByteBufferWriter {
 
   // Borrowed from openJDK:
   // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index f77092d..f6f7def 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class ByteBufferOutputStream extends OutputStream
-    implements ByteBufferSupportOutputStream {
+    implements ByteBufferWriter {
   
   // Borrowed from openJDK:
   // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
index e528f02..115671d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
@@ -140,7 +140,7 @@ public class ByteBufferPool {
     buffers.offer(buf);
   }
 
-  int getBufferSize() {
+  public int getBufferSize() {
     return this.bufferSize;
   }
 
@@ -148,7 +148,7 @@ public class ByteBufferPool {
    * @return Number of free buffers
    */
   @VisibleForTesting
-  int getQueueSize() {
+  public int getQueueSize() {
     return buffers.size();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
deleted file mode 100644
index 3a52e63..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportDataOutputStream.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-
-/**
- * Our extension of DataOutputStream which implements ByteBufferSupportOutputStream
- */
-@InterfaceAudience.Private
-public class ByteBufferSupportDataOutputStream extends DataOutputStream
-    implements ByteBufferSupportOutputStream {
-
-  public ByteBufferSupportDataOutputStream(OutputStream out) {
-    super(out);
-  }
-
-  @Override
-  public void write(ByteBuffer b, int off, int len) throws IOException {
-    ByteBufferUtils.copyBufferToStream(out, b, off, len);
-    written += len;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
deleted file mode 100644
index ccb5c81..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Interface adds support for writing {@link ByteBuffer} into OutputStream.
- */
-@InterfaceAudience.Private
-public interface ByteBufferSupportOutputStream {
-
-  /**
-   * Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
-   * to this output stream.
-   *
-   * @param b the data.
-   * @param off the start offset in the data.
-   * @param len the number of bytes to write.
-   * @exception IOException
-   *              if an I/O error occurs. In particular, an <code>IOException</code> is thrown if
-   *              the output stream is closed.
-   */
-  void write(ByteBuffer b, int off, int len) throws IOException;
-
-  /**
-   * Writes an <code>int</code> to the underlying output stream as four
-   * bytes, high byte first.
-   * @param i the <code>int</code> to write
-   * @throws IOException if an I/O error occurs.
-   */
-  void writeInt(int i) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
new file mode 100644
index 0000000..012080c
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This interface marks a class to support writing ByteBuffers into it.
+ * @see ByteArrayOutputStream
+ * @see ByteBufferOutputStream
+ */
+@InterfaceAudience.Private
+public interface ByteBufferWriter {
+
+  /**
+   * Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
+   *
+   * @param b the data.
+   * @param off the start offset in the data.
+   * @param len the number of bytes to write.
+   * @exception IOException if an I/O error occurs.
+   */
+  void write(ByteBuffer b, int off, int len) throws IOException;
+
+  /**
+   * Writes an <code>int</code> to the underlying output stream as four bytes, high byte first.
+   * @param i the <code>int</code> to write
+   * @throws IOException if an I/O error occurs.
+   */
+  // This is pure performance oriented API been added here. It has nothing to do with
+  // ByteBuffer and so not fully belong to here. This allows an int to be written at one go instead
+  // of 4 (4 bytes one by one).
+  // TODO remove it from here?
+  void writeInt(int i) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
new file mode 100644
index 0000000..7ddb8a9
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * Our extension of DataOutputStream which implements ByteBufferWriter
+ */
+@InterfaceAudience.Private
+public class ByteBufferWriterDataOutputStream extends DataOutputStream
+    implements ByteBufferWriter {
+
+  public ByteBufferWriterDataOutputStream(OutputStream out) {
+    super(out);
+  }
+
+  @Override
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    ByteBufferUtils.copyBufferToStream(out, b, off, len);
+    written += len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
new file mode 100644
index 0000000..56c6956
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * When deal with OutputStream which is not ByteBufferWriter type, wrap it with this class. We will
+ * have to write offheap ByteBuffer (DBB) data into the OS. This class is having a temp byte array
+ * to which we can copy the DBB data for writing to the OS.
+ * <br>
+ * This is used while writing Cell data to WAL. In case of AsyncWAL, the OS created there is
+ * ByteBufferWriter. But in case of FSHLog, the OS passed by DFS client, is not of type
+ * ByteBufferWriter. We will need this temp solution until DFS client supports writing ByteBuffer
+ * directly to the OS it creates.
+ * <br>
+ * Note: This class is not thread safe.
+ */
+@InterfaceAudience.Private
+public class ByteBufferWriterOutputStream extends OutputStream
+    implements ByteBufferWriter {
+
+  private static final int TEMP_BUF_LENGTH = 4 * 1024;
+  private final OutputStream os;
+  private byte[] tempBuf = null;
+
+  public ByteBufferWriterOutputStream(OutputStream os) {
+    this.os = os;
+  }
+
+  @Override
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    byte[] buf = null;
+    if (len > TEMP_BUF_LENGTH) {
+      buf = new byte[len];
+    } else {
+      if (this.tempBuf == null) {
+        this.tempBuf = new byte[TEMP_BUF_LENGTH];
+      }
+      buf = this.tempBuf;
+    }
+    ByteBufferUtils.copyFromBufferToArray(buf, b, off, 0, len);
+    this.os.write(buf, 0, len);
+  }
+
+  @Override
+  public void writeInt(int i) throws IOException {
+    StreamUtils.writeInt(this.os, i);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    this.os.write(b);
+  }
+
+  public void write(byte b[], int off, int len) throws IOException {
+    this.os.write(b, off, len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    this.os.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.os.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 183a031..60202a0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -34,7 +36,10 @@ import org.apache.hadoop.io.WritableUtils;
  * helps us in the read path.
  */
 @InterfaceAudience.Private
+// TODO to have another name. This can easily get confused with netty's ByteBuf
 public abstract class ByteBuff {
+  private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
+
   /**
    * @return this ByteBuff's current position
    */
@@ -356,6 +361,14 @@ public abstract class ByteBuff {
   public abstract long getLongAfterPosition(int offset);
 
   /**
+   * Copy the content from this ByteBuff to a byte[].
+   * @return byte[] with the copied contents from this ByteBuff.
+   */
+  public byte[] toBytes() {
+    return toBytes(0, this.limit());
+  }
+
+  /**
    * Copy the content from this ByteBuff to a byte[] based on the given offset and
    * length
    *
@@ -389,7 +402,39 @@ public abstract class ByteBuff {
    */
   public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
 
+  /**
+   * Reads bytes from the given channel into this ByteBuff
+   * @param channel
+   * @return The number of bytes read from the channel
+   * @throws IOException
+   */
+  public abstract int read(ReadableByteChannel channel) throws IOException;
+
   // static helper methods
+  public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
+    if (buf.remaining() <= NIO_BUFFER_LIMIT) {
+      return channel.read(buf);
+    }
+    int originalLimit = buf.limit();
+    int initialRemaining = buf.remaining();
+    int ret = 0;
+
+    while (buf.remaining() > 0) {
+      try {
+        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+        buf.limit(buf.position() + ioSize);
+        ret = channel.read(buf);
+        if (ret < ioSize) {
+          break;
+        }
+      } finally {
+        buf.limit(originalLimit);
+      }
+    }
+    int nBytes = initialRemaining - buf.remaining();
+    return (nBytes > 0) ? nBytes : ret;
+  }
+
   /**
    * Read integer from ByteBuff coded in 7 bits and increment position.
    * @return Read integer.

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 107bb3f..948321d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -17,16 +17,20 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.InvalidMarkException;
+import java.nio.channels.ReadableByteChannel;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
  * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@@ -1071,6 +1075,28 @@ public class MultiByteBuff extends ByteBuff {
   }
 
   @Override
+  public int read(ReadableByteChannel channel) throws IOException {
+    int total = 0;
+    while (true) {
+      // Read max possible into the current BB
+      int len = channelRead(channel, this.curItem);
+      if (len > 0)
+        total += len;
+      if (this.curItem.hasRemaining()) {
+        // We were not able to read enough to fill the current BB itself. Means there is no point in
+        // doing more reads from Channel. Only this much there for now.
+        break;
+      } else {
+        if (this.curItemIndex >= this.limitedItemIndex)
+          break;
+        this.curItemIndex++;
+        this.curItem = this.items[this.curItemIndex];
+      }
+    }
+    return total;
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (!(obj instanceof MultiByteBuff)) return false;
     if (this == obj) return true;
@@ -1091,4 +1117,12 @@ public class MultiByteBuff extends ByteBuff {
     }
     return hash;
   }
+
+  /**
+   * @return the ByteBuffers which this wraps.
+   */
+  @VisibleForTesting
+  public ByteBuffer[] getEnclosingByteBuffers() {
+    return this.items;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 9ad28dc..0e45410 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -25,6 +27,8 @@ import org.apache.hadoop.hbase.util.ObjectIntPair;
 import org.apache.hadoop.hbase.util.UnsafeAccess;
 import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import sun.nio.ch.DirectBuffer;
 
 /**
@@ -313,6 +317,11 @@ public class SingleByteBuff extends ByteBuff {
   }
 
   @Override
+  public int read(ReadableByteChannel channel) throws IOException {
+    return channelRead(channel, buf);
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if(!(obj instanceof SingleByteBuff)) return false;
     return this.buf.equals(((SingleByteBuff)obj).buf);
@@ -326,7 +335,8 @@ public class SingleByteBuff extends ByteBuff {
   /**
    * @return the ByteBuffer which this wraps.
    */
-  ByteBuffer getEnclosingByteBuffer() {
+  @VisibleForTesting
+  public ByteBuffer getEnclosingByteBuffer() {
     return this.buf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 0653d1a..c9a19ff 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
@@ -144,8 +144,8 @@ public final class ByteBufferUtils {
      // We have writeInt in ByteBufferOutputStream so that it can directly write
      // int to underlying
      // ByteBuffer in one step.
-     if (out instanceof ByteBufferSupportOutputStream) {
-       ((ByteBufferSupportOutputStream) out).writeInt(value);
+     if (out instanceof ByteBufferWriter) {
+       ((ByteBufferWriter) out).writeInt(value);
      } else {
        StreamUtils.writeInt(out, value);
      }
@@ -182,8 +182,8 @@ public final class ByteBufferUtils {
    */
   public static void copyBufferToStream(OutputStream out, ByteBuffer in,
       int offset, int length) throws IOException {
-    if (out instanceof ByteBufferSupportOutputStream) {
-      ((ByteBufferSupportOutputStream) out).write(in, offset, length);
+    if (out instanceof ByteBufferWriter) {
+      ((ByteBufferWriter) out).write(in, offset, length);
     } else if (in.hasArray()) {
       out.write(in.array(), in.arrayOffset() + offset, length);
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
index a332a63..67434a0 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
@@ -78,7 +78,7 @@ public class TestTagCompressionContext {
   @Test
   public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+    DataOutputStream daos = new ByteBufferWriterDataOutputStream(baos);
     TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
     ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2);
     int tagsLength1 = kv1.getTagsLength();
@@ -127,7 +127,7 @@ public class TestTagCompressionContext {
   @Test
   public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+    DataOutputStream daos = new ByteBufferWriterDataOutputStream(baos);
     TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
     ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1);
     int tagsLength1 = kv1.getTagsLength();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
index ea162fc..41dc387 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.codec;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -83,8 +83,8 @@ public class MessageCodec implements Codec {
   }
 
   @Override
-  public Decoder getDecoder(ByteBuffer buf) {
-    return getDecoder(new ByteBufferInputStream(buf));
+  public Decoder getDecoder(ByteBuff buf) {
+    return getDecoder(new ByteBuffInputStream(buf));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 13b501a..59b8884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.ByteBuffInputStream;
-import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
@@ -962,7 +962,7 @@ public class HFileBlock implements Cacheable {
       state = State.WRITING;
 
       // We will compress it later in finishBlock()
-      userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory);
+      userDataStream = new ByteBufferWriterDataOutputStream(baosInMemory);
       if (newBlockType == BlockType.DATA) {
         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 34140a9..d570b17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc;
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CallDroppedException;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 @InterfaceStability.Evolving
 public class CallRunner {
-  private static final Log LOG = LogFactory.getLog(CallRunner.class);
 
   private static final CallDroppedException CALL_DROPPED_EXCEPTION
     = new CallDroppedException();
@@ -143,6 +140,8 @@ public class CallRunner {
           sucessful = true;
         }
       }
+      // return back the RPC request read BB we can do here. It is done by now.
+      call.cleanup();
       // Set the response
       Message param = resultPair != null ? resultPair.getFirst() : null;
       CellScanner cells = resultPair != null ? resultPair.getSecond() : null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 1c2d51f..6eefaac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.ByteArrayInputStream;
@@ -99,6 +100,9 @@ import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -146,6 +150,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -304,6 +309,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
   private UserProvider userProvider;
 
   private final ByteBufferPool reservoir;
+  // The requests and response will use buffers from ByteBufferPool, when the size of the
+  // request/response is at least this size.
+  // We make this to be 1/6th of the pool buffer size.
+  private final int minSizeForReservoirUse;
 
   private volatile boolean allowFallbackToSimpleAuth;
 
@@ -344,10 +353,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     protected boolean isError;
     protected TraceInfo tinfo;
     private ByteBufferListOutputStream cellBlockStream = null;
+    private CallCleanup reqCleanup = null;
 
     private User user;
     private InetAddress remoteAddress;
-    private RpcCallback callback;
+    private RpcCallback rpcCallback;
 
     private long responseCellSize = 0;
     private long responseBlockSize = 0;
@@ -357,7 +367,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         justification="Can't figure why this complaint is happening... see below")
     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
          Message param, CellScanner cellScanner, Connection connection, Responder responder,
-         long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
+         long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+         CallCleanup reqCleanup) {
       this.id = id;
       this.service = service;
       this.md = md;
@@ -377,6 +388,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           connection == null? null: connection.retryImmediatelySupported;
       this.timeout = timeout;
       this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
+      this.reqCleanup = reqCleanup;
     }
 
     /**
@@ -391,9 +403,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
                                                 // got from pool.
         this.cellBlockStream = null;
       }
+      cleanup();// If the call was run successfuly, we might have already returned the
+                             // BB back to pool. No worries..Then inputCellBlock will be null
       this.connection.decRpcCount();  // Say that we're done with this call.
     }
 
+    protected void cleanup() {
+      if (this.reqCleanup != null) {
+        this.reqCleanup.run();
+        this.reqCleanup = null;
+      }
+    }
+
     @Override
     public String toString() {
       return toShortString() + " param: " +
@@ -515,9 +536,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       this.response = bc;
       // Once a response message is created and set to this.response, this Call can be treated as
       // done. The Responder thread will do the n/w write of this message back to client.
-      if (this.callback != null) {
+      if (this.rpcCallback != null) {
         try {
-          this.callback.run();
+          this.rpcCallback.run();
         } catch (Exception e) {
           // Don't allow any exception here to kill this handler thread.
           LOG.warn("Exception while running the Rpc Callback.", e);
@@ -722,7 +743,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
     @Override
     public synchronized void setCallBack(RpcCallback callback) {
-      this.callback = callback;
+      this.rpcCallback = callback;
     }
 
     @Override
@@ -731,6 +752,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     }
   }
 
+  @FunctionalInterface
+  static interface CallCleanup {
+    void run();
+  }
+
   /** Listens on the socket. Creates jobs for the handler threads*/
   private class Listener extends Thread {
 
@@ -1289,7 +1315,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     // If the connection header has been read or not.
     private boolean connectionHeaderRead = false;
     protected SocketChannel channel;
-    private ByteBuffer data;
+    private ByteBuff data;
+    private CallCleanup callCleanup;
     private ByteBuffer dataLengthBuffer;
     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
     private final Lock responseWriteLock = new ReentrantLock();
@@ -1327,17 +1354,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     // Fake 'call' for failed authorization response
     private static final int AUTHORIZATION_FAILED_CALLID = -1;
     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
-        null, null, this, null, 0, null, null, 0);
+        null, null, this, null, 0, null, null, 0, null);
     private ByteArrayOutputStream authFailedResponse =
         new ByteArrayOutputStream();
     // Fake 'call' for SASL context setup
     private static final int SASL_CALLID = -33;
     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
-        0, null, null, 0);
+        0, null, null, 0, null);
     // Fake 'call' for connection header response
     private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
     private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID,
-        null, null, null, null, null, this, null, 0, null, null, 0);
+        null, null, null, null, null, this, null, 0, null, null, 0, null);
 
     // was authentication allowed with a fallback to simple auth
     private boolean authenticatedWithFallback;
@@ -1352,6 +1379,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
+      this.callCleanup = null;
       this.dataLengthBuffer = ByteBuffer.allocate(4);
       this.socket = channel.socket();
       this.addr = socket.getInetAddress();
@@ -1437,7 +1465,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return authorizedUgi;
     }
 
-    private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
+    private void saslReadAndProcess(ByteBuff saslToken) throws IOException,
         InterruptedException {
       if (saslContextEstablished) {
         if (LOG.isTraceEnabled())
@@ -1447,13 +1475,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         if (!useWrap) {
           processOneRpc(saslToken);
         } else {
-          byte[] b = saslToken.array();
+          byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
           byte [] plaintextData;
           if (useCryptoAesWrap) {
             // unwrap with CryptoAES
-            plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit());
+            plaintextData = cryptoAES.unwrap(b, 0, b.length);
           } else {
-            plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
+            plaintextData = saslServer.unwrap(b, 0, b.length);
           }
           processUnwrappedData(plaintextData);
         }
@@ -1506,7 +1534,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
             LOG.debug("Have read input token of size " + saslToken.limit()
                 + " for processing by saslServer.evaluateResponse()");
           }
-          replyToken = saslServer.evaluateResponse(saslToken.array());
+          replyToken = saslServer
+              .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
         } catch (IOException e) {
           IOException sendToClient = e;
           Throwable cause = e;
@@ -1759,7 +1788,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
             // Notify the client about the offending request
             Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
-                null, this, responder, 0, null, this.addr,0);
+                null, this, responder, 0, null, this.addr, 0, null);
             metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
             // Make sure the client recognizes the underlying exception
             // Otherwise, throw a DoNotRetryIOException.
@@ -1779,7 +1808,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           return -1;
         }
 
-        data = ByteBuffer.allocate(dataLength);
+        // Initialize this.data with a ByteBuff.
+        // This call will allocate a ByteBuff to read request into and assign to this.data
+        // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
+        // assign to this.callCleanup
+        initByteBuffToReadInto(dataLength);
 
         // Increment the rpc count. This counter will be decreased when we write
         //  the response.  If we want the connection to be detected as idle properly, we
@@ -1787,7 +1820,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         incRpcCount();
       }
 
-      count = channelRead(channel, data);
+      count = channelDataRead(channel, data);
 
       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
         process();
@@ -1796,11 +1829,41 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return count;
     }
 
+    // It creates the ByteBuff and CallCleanup and assign to Connection instance.
+    private void initByteBuffToReadInto(int length) {
+      // We create random on heap buffers are read into those when
+      // 1. ByteBufferPool is not there.
+      // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
+      // waste then. Also if all the reqs are of this size, we will be creating larger sized
+      // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
+      // RegionOpen.
+      // 3. If it is an initial handshake signal or initial connection request. Any way then
+      // condition 2 itself will match
+      // 4. When SASL use is ON.
+      if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl
+          || length < minSizeForReservoirUse) {
+        this.data = new SingleByteBuff(ByteBuffer.allocate(length));
+      } else {
+        Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(reservoir,
+            minSizeForReservoirUse, length);
+        this.data = pair.getFirst();
+        this.callCleanup = pair.getSecond();
+      }
+    }
+
+    protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
+      int count = buf.read(channel);
+      if (count > 0) {
+        metrics.receivedBytes(count);
+      }
+      return count;
+    }
+
     /**
      * Process the data buffer and clean the connection state for the next call.
      */
     private void process() throws IOException, InterruptedException {
-      data.flip();
+      data.rewind();
       try {
         if (skipInitialSaslHandshake) {
           skipInitialSaslHandshake = false;
@@ -1816,6 +1879,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       } finally {
         dataLengthBuffer.clear(); // Clean for the next call
         data = null; // For the GC
+        this.callCleanup = null;
       }
     }
 
@@ -1831,7 +1895,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
       LOG.warn(msg);
-      Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
+      Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null, 0,
+          null);
       setupResponse(null, fakeCall, e, msg);
       responder.doRespond(fakeCall);
       // Returning -1 closes out the connection.
@@ -1839,9 +1904,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     }
 
     // Reads the connection header following version
-    private void processConnectionHeader(ByteBuffer buf) throws IOException {
-      this.connectionHeader = ConnectionHeader.parseFrom(
-        new ByteBufferInputStream(buf));
+    private void processConnectionHeader(ByteBuff buf) throws IOException {
+      if (buf.hasArray()) {
+        this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
+      } else {
+        CodedInputStream cis = CodedInputStream
+            .newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true);
+        cis.enableAliasing(true);
+        this.connectionHeader = ConnectionHeader.parseFrom(cis);
+      }
       String serviceName = connectionHeader.getServiceName();
       if (serviceName == null) throw new EmptyServiceNameException();
       this.service = getService(services, serviceName);
@@ -2043,13 +2114,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         if (unwrappedData.remaining() == 0) {
           unwrappedDataLengthBuffer.clear();
           unwrappedData.flip();
-          processOneRpc(unwrappedData);
+          processOneRpc(new SingleByteBuff(unwrappedData));
           unwrappedData = null;
         }
       }
     }
 
-    private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
+    private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
       if (connectionHeaderRead) {
         processRequest(buf);
       } else {
@@ -2071,12 +2142,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
      * @throws IOException
      * @throws InterruptedException
      */
-    protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+    protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
       long totalRequestSize = buf.limit();
       int offset = 0;
       // Here we read in the header.  We avoid having pb
       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
-      CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
+      CodedInputStream cis;
+      if (buf.hasArray()) {
+        cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
+      } else {
+        cis = CodedInputStream.newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true);
+        cis.enableAliasing(true);
+      }
       int headerSize = cis.readRawVarint32();
       offset = cis.getTotalBytesRead();
       Message.Builder builder = RequestHeader.newBuilder();
@@ -2093,7 +2170,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
         final Call callTooBig =
           new Call(id, this.service, null, null, null, null, this,
-            responder, totalRequestSize, null, null, 0);
+            responder, totalRequestSize, null, null, 0, null);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
@@ -2127,7 +2204,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
         if (header.hasCellBlockMeta()) {
           buf.position(offset);
-          cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
+          ByteBuff dup = buf.duplicate();
+          dup.limit(offset + header.getCellBlockMeta().getLength());
+          cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
+              this.compressionCodec, dup);
         }
       } catch (Throwable t) {
         InetSocketAddress address = getListenerAddress();
@@ -2148,7 +2228,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
         final Call readParamsFailedCall =
           new Call(id, this.service, null, null, null, null, this,
-            responder, totalRequestSize, null, null, 0);
+            responder, totalRequestSize, null, null, 0, null);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         setupResponse(responseBuffer, readParamsFailedCall, t,
           msg + "; " + t.getMessage());
@@ -2164,7 +2244,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         timeout = Math.max(minClientRequestTimeout, header.getTimeout());
       }
       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
-              totalRequestSize, traceInfo, this.addr, timeout);
+          totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup);
 
       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
         callQueueSizeInBytes.add(-1 * call.getSize());
@@ -2211,6 +2291,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     protected synchronized void close() {
       disposeSasl();
       data = null;
+      callCleanup = null;
       if (!channel.isOpen())
         return;
       try {socket.shutdownOutput();} catch(Exception ignored) {
@@ -2301,8 +2382,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
+      this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
     } else {
       reservoir = null;
+      this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
     }
     this.server = server;
     this.services = services;
@@ -2347,6 +2430,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     this.scheduler.init(new RpcSchedulerContext(this));
   }
 
+  @VisibleForTesting
+  static int getMinSizeForReservoirUse(ByteBufferPool pool) {
+    return pool.getBufferSize() / 6;
+  }
+
   @Override
   public void onConfigurationChange(Configuration newConf) {
     initReconfigurable(newConf);
@@ -2755,6 +2843,55 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
   }
 
   /**
+   * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
+   * as much as possible.
+   *
+   * @param pool The ByteBufferPool to use
+   * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
+   *           need of size below this, create on heap ByteBuffer.
+   * @param reqLen Bytes count in request
+   */
+  @VisibleForTesting
+  static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
+      int minSizeForPoolUse, int reqLen) {
+    ByteBuff resultBuf;
+    List<ByteBuffer> bbs = new ArrayList<ByteBuffer>((reqLen / pool.getBufferSize()) + 1);
+    int remain = reqLen;
+    ByteBuffer buf = null;
+    while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
+      bbs.add(buf);
+      remain -= pool.getBufferSize();
+    }
+    ByteBuffer[] bufsFromPool = null;
+    if (bbs.size() > 0) {
+      bufsFromPool = new ByteBuffer[bbs.size()];
+      bbs.toArray(bufsFromPool);
+    }
+    if (remain > 0) {
+      bbs.add(ByteBuffer.allocate(remain));
+    }
+    if (bbs.size() > 1) {
+      ByteBuffer[] items = new ByteBuffer[bbs.size()];
+      bbs.toArray(items);
+      resultBuf = new MultiByteBuff(items);
+    } else {
+      // We are backed by single BB
+      resultBuf = new SingleByteBuff(bbs.get(0));
+    }
+    resultBuf.limit(reqLen);
+    if (bufsFromPool != null) {
+      final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
+      return new Pair<ByteBuff, RpcServer.CallCleanup>(resultBuf, () -> {
+        // Return back all the BBs to pool
+        for (int i = 0; i < bufsFromPoolFinal.length; i++) {
+          pool.putbackBuffer(bufsFromPoolFinal[i]);
+        }
+      });
+    }
+    return new Pair<ByteBuff, RpcServer.CallCleanup>(resultBuf, null);
+  }
+
+  /**
    * Needed for features such as delayed calls.  We need to be able to store the current call
    * so that we can complete it later or ask questions of what is supported by the current ongoing
    * call.
@@ -3054,4 +3191,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       idleScanTimer.schedule(idleScanTask, idleScanInterval);
     }
   }
-}
+
+  private static class ByteBuffByteInput extends ByteInput {
+
+    private ByteBuff buf;
+    private int offset;
+    private int length;
+
+    ByteBuffByteInput(ByteBuff buf, int offset, int length) {
+      this.buf = buf;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    @Override
+    public byte read(int offset) {
+      return this.buf.get(getAbsoluteOffset(offset));
+    }
+
+    private int getAbsoluteOffset(int offset) {
+      return this.offset + offset;
+    }
+
+    @Override
+    public int read(int offset, byte[] out, int outOffset, int len) {
+      this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
+      return len;
+    }
+
+    @Override
+    public int read(int offset, ByteBuffer out) {
+      int len = out.remaining();
+      this.buf.get(out, getAbsoluteOffset(offset), len);
+      return len;
+    }
+
+    @Override
+    public int size() {
+      return this.length;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b444a1c..831627b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5201,9 +5201,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   private HStore getHStore(Cell cell) {
     for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
-      if (Bytes.equals(
-          cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
-          famStore.getKey(), 0, famStore.getKey().length)) {
+      if (CellUtil.matchingFamily(cell, famStore.getKey(), 0, famStore.getKey().length)) {
         return (HStore) famStore.getValue();
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 314bef0..a0ac8a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
@@ -57,7 +57,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
   private AsyncFSOutput output;
 
   private static final class OutputStreamWrapper extends OutputStream
-      implements ByteBufferSupportOutputStream {
+      implements ByteBufferWriter {
 
     private final AsyncFSOutput out;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 52dfae0..1a18087 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
 import org.apache.hadoop.hbase.codec.BaseEncoder;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferWriter;
+import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
 import org.apache.hadoop.hbase.io.util.Dictionary;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -356,14 +358,18 @@ public class WALCellCodec implements Codec {
   }
 
   @Override
-  public Decoder getDecoder(ByteBuffer buf) {
-    return getDecoder(new ByteBufferInputStream(buf));
+  public Decoder getDecoder(ByteBuff buf) {
+    return getDecoder(new ByteBuffInputStream(buf));
   }
 
   @Override
   public Encoder getEncoder(OutputStream os) {
-    return (compression == null)
-        ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
+    if (compression == null) {
+      os = (os instanceof ByteBufferWriter) ? os
+          : new ByteBufferWriterOutputStream(os);
+      return new EnsureKvEncoder(os);
+    }
+    return new CompressedKvEncoder(os, compression);
   }
 
   public ByteStringCompressor getByteStringCompressor() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c3685760/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 2211e8f..5a9178a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -33,7 +33,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
@@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -315,7 +315,7 @@ public abstract class AbstractTestIPC {
       }
 
       @Override
-      protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+      protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
         // this will throw exception after the connection header is read, and an RPC is sent
         // from client
         throw new DoNotRetryIOException("Failing for test");