You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/06/14 13:34:18 UTC

hbase git commit: HBASE-15525 OutOfMemory could occur when using BoundedByteBufferPool during RPC bursts.

Repository: hbase
Updated Branches:
  refs/heads/master e486d274c -> 17bcf14fe


HBASE-15525 OutOfMemory could occur when using BoundedByteBufferPool during RPC bursts.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/17bcf14f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/17bcf14f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/17bcf14f

Branch: refs/heads/master
Commit: 17bcf14fea2637fe0e5ca23bb0008c1cca208c98
Parents: e486d27
Author: anoopsjohn <an...@gmail.com>
Authored: Tue Jun 14 19:03:54 2016 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Tue Jun 14 19:03:54 2016 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    | 107 ++++++------
 .../hbase/io/ByteBufferListOutputStream.java    | 173 +++++++++++++++++++
 .../hadoop/hbase/io/ByteBufferOutputStream.java |  46 ++---
 .../apache/hadoop/hbase/io/ByteBufferPool.java  | 154 +++++++++++++++++
 .../io/TestByteBufferListOutputStream.java      |  77 +++++++++
 .../hadoop/hbase/io/TestByteBufferPool.java     |  60 +++++++
 .../apache/hadoop/hbase/ipc/BufferChain.java    |   7 +
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  99 +++++++----
 8 files changed, 609 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 74466b5..74f934c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -34,10 +34,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -90,30 +90,7 @@ public class IPCUtil {
    */
   @SuppressWarnings("resource")
   public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
-    final CellScanner cellScanner)
-  throws IOException {
-    return buildCellBlock(codec, compressor, cellScanner, null);
-  }
-
-  /**
-   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
-   * <code>compressor</code>.
-   * @param codec to use for encoding
-   * @param compressor to use for encoding
-   * @param cellScanner to encode
-   * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
-   *   our own ByteBuffer.
-   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
-   *   passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
-   *   flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
-   *   null, then this returned ByteBuffer came from there and should be returned to the pool when
-   *   done.
-   * @throws IOException if encoding the cells fail
-   */
-  @SuppressWarnings("resource")
-  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
-    final CellScanner cellScanner, final BoundedByteBufferPool pool)
-  throws IOException {
+      final CellScanner cellScanner) throws IOException {
     if (cellScanner == null) {
       return null;
     }
@@ -121,25 +98,25 @@ public class IPCUtil {
       throw new CellScannerButNoCodecException();
     }
     int bufferSize = this.cellBlockBuildingInitialBufferSize;
-    ByteBufferOutputStream baos;
-    if (pool != null) {
-      ByteBuffer bb = pool.getBuffer();
-      bufferSize = bb.capacity();
-      baos = new ByteBufferOutputStream(bb);
-    } else {
-      // Then we need to make our own to return.
-      if (cellScanner instanceof HeapSize) {
-        long longSize = ((HeapSize)cellScanner).heapSize();
-        // Just make sure we don't have a size bigger than an int.
-        if (longSize > Integer.MAX_VALUE) {
-          throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
-        }
-        bufferSize = ClassSize.align((int)longSize);
+    ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+    encodeCellsTo(baos, cellScanner, codec, compressor);
+    if (LOG.isTraceEnabled()) {
+      if (bufferSize < baos.size()) {
+        LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
+            + "; up hbase.ipc.cellblock.building.initial.buffersize?");
       }
-      baos = new ByteBufferOutputStream(bufferSize);
     }
+    ByteBuffer bb = baos.getByteBuffer();
+    // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+    // gets or something -- stuff that does not return a cell).
+    if (!bb.hasRemaining()) return null;
+    return bb;
+  }
+
+  private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
+      CompressionCodec compressor) throws IOException {
+    OutputStream os = bbos;
     Compressor poolCompressor = null;
-    OutputStream os = baos;
     try  {
       if (compressor != null) {
         if (compressor instanceof Configurable) {
@@ -149,33 +126,51 @@ public class IPCUtil {
         os = compressor.createOutputStream(os, poolCompressor);
       }
       Codec.Encoder encoder = codec.getEncoder(os);
-      int count = 0;
       while (cellScanner.advance()) {
         encoder.write(cellScanner.current());
-        count++;
       }
       encoder.flush();
-      // If no cells, don't mess around.  Just return null (could be a bunch of existence checking
-      // gets or something -- stuff that does not return a cell).
-      if (count == 0) {
-        return null;
-      }
     } catch (BufferOverflowException e) {
       throw new DoNotRetryIOException(e);
     } finally {
       os.close();
-
       if (poolCompressor != null) {
         CodecPool.returnCompressor(poolCompressor);
       }
     }
-    if (LOG.isTraceEnabled()) {
-      if (bufferSize < baos.size()) {
-        LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
-          "; up hbase.ipc.cellblock.building.initial.buffersize?");
-      }
+  }
+
+  /**
+   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+   * <code>compressor</code>.
+   * @param codec to use for encoding
+   * @param compressor to use for encoding
+   * @param cellScanner to encode
+   * @param pool Pool of ByteBuffers to make use of.
+   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+   *   passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
+   *   flipped and is ready for reading.  Use limit to find total size. If <code>pool</code> was not
+   *   null, then this returned ByteBuffer came from there and should be returned to the pool when
+   *   done.
+   * @throws IOException if encoding the cells fail
+   */
+  @SuppressWarnings("resource")
+  public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
+      CellScanner cellScanner, ByteBufferPool pool) throws IOException {
+    if (cellScanner == null) {
+      return null;
+    }
+    if (codec == null) {
+      throw new CellScannerButNoCodecException();
+    }
+    assert pool != null;
+    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+    encodeCellsTo(bbos, cellScanner, codec, compressor);
+    if (bbos.size() == 0) {
+      bbos.releaseResources();
+      return null;
     }
-    return baos.getByteBuffer();
+    return bbos;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
new file mode 100644
index 0000000..b4c00c6
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when
+ * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap.
+ * Make sure to call {@link #releaseResources()} method once the Stream usage is over and
+ * data is transferred to the wanted destination.
+ * Not thread safe!
+ */
+@InterfaceAudience.Private
+public class ByteBufferListOutputStream extends ByteBufferOutputStream {
+  private static final Log LOG = LogFactory.getLog(ByteBufferListOutputStream.class);
+
+  private ByteBufferPool pool;
+  // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
+  // it is not available will make a new one our own and keep writing to that. We keep track of all
+  // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
+  // to return back all of them to pool
+  protected List<ByteBuffer> allBufs = new ArrayList<ByteBuffer>();
+  protected List<ByteBuffer> bufsFromPool = new ArrayList<ByteBuffer>();
+
+  private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
+
+  public ByteBufferListOutputStream(ByteBufferPool pool) {
+    this.pool = pool;
+    allocateNewBuffer();
+  }
+
+  private void allocateNewBuffer() {
+    if (this.curBuf != null) {
+      this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
+    }
+    // Get an initial BB to work with from the pool
+    this.curBuf = this.pool.getBuffer();
+    if (this.curBuf == null) {
+      // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
+      // heap BB on demand. It is difficult to account for all such and so proper sizing of Max
+      // direct heap size. See HBASE-15525 also for more details.
+      // Make BB with same size of pool's buffer size.
+      this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
+    } else {
+      this.bufsFromPool.add(this.curBuf);
+    }
+    this.allBufs.add(this.curBuf);
+  }
+
+  @Override
+  public int size() {
+    int s = 0;
+    for (int i = 0; i < this.allBufs.size() - 1; i++) {
+      s += this.allBufs.get(i).remaining();
+    }
+    // On the last BB, it might not be flipped yet if getByteBuffers is not yet called
+    if (this.lastBufFlipped) {
+      s += this.curBuf.remaining();
+    } else {
+      s += this.curBuf.position();
+    }
+    return s;
+  }
+
+  @Override
+  public ByteBuffer getByteBuffer() {
+    throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer");
+  }
+
+  @Override
+  protected void checkSizeAndGrow(int extra) {
+    long capacityNeeded = curBuf.position() + (long) extra;
+    if (capacityNeeded > curBuf.limit()) {
+      allocateNewBuffer();
+    }
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    // No usage of this API in code. Just making it as an Unsupported operation as of now
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only
+   * when all the data is fully used. And it must be called at the end of usage else we will leak
+   * ByteBuffers from pool.
+   */
+  public void releaseResources() {
+    try {
+      close();
+    } catch (IOException e) {
+      LOG.debug(e);
+    }
+    // Return back all the BBs to pool
+    if (this.bufsFromPool != null) {
+      for (int i = 0; i < this.bufsFromPool.size(); i++) {
+        this.pool.putbackBuffer(this.bufsFromPool.get(i));
+      }
+      this.bufsFromPool = null;
+    }
+    this.allBufs = null;
+    this.curBuf = null;
+  }
+
+  @Override
+  public byte[] toByteArray(int offset, int length) {
+    // No usage of this API in code. Just making it as an Unsupported operation as of now
+    throw new UnsupportedOperationException();
+  }
+
+  public List<ByteBuffer> getByteBuffers() {
+    if (!this.lastBufFlipped) {
+      this.lastBufFlipped = true;
+      // All the other BBs are already flipped while moving to the new BB.
+      curBuf.flip();
+    }
+    return this.allBufs;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    int toWrite = 0;
+    while (len > 0) {
+      toWrite = Math.min(len, this.curBuf.remaining());
+      ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite);
+      off += toWrite;
+      len -= toWrite;
+      if (len > 0) {
+        allocateNewBuffer();// The curBuf is over. Let us move to the next one
+      }
+    }
+  }
+
+  @Override
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    int toWrite = 0;
+    while (len > 0) {
+      toWrite = Math.min(len, this.curBuf.remaining());
+      ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite);
+      off += toWrite;
+      len -= toWrite;
+      if (len > 0) {
+        allocateNewBuffer();// The curBuf is over. Let us move to the next one
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index d4bda18..f77092d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -44,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream
   // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
   private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
-  protected ByteBuffer buf;
+  protected ByteBuffer curBuf = null;
+
+  ByteBufferOutputStream() {
+
+  }
 
   public ByteBufferOutputStream(int capacity) {
     this(capacity, false);
@@ -66,12 +70,12 @@ public class ByteBufferOutputStream extends OutputStream
    */
   public ByteBufferOutputStream(final ByteBuffer bb) {
     assert bb.order() == ByteOrder.BIG_ENDIAN;
-    this.buf = bb;
-    this.buf.clear();
+    this.curBuf = bb;
+    this.curBuf.clear();
   }
 
   public int size() {
-    return buf.position();
+    return curBuf.position();
   }
 
   private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
@@ -86,25 +90,25 @@ public class ByteBufferOutputStream extends OutputStream
    * @return ByteBuffer
    */
   public ByteBuffer getByteBuffer() {
-    buf.flip();
-    return buf;
+    curBuf.flip();
+    return curBuf;
   }
 
-  private void checkSizeAndGrow(int extra) {
-    long capacityNeeded = buf.position() + (long) extra;
-    if (capacityNeeded > buf.limit()) {
+  protected void checkSizeAndGrow(int extra) {
+    long capacityNeeded = curBuf.position() + (long) extra;
+    if (capacityNeeded > curBuf.limit()) {
       // guarantee it's possible to fit
       if (capacityNeeded > MAX_ARRAY_SIZE) {
         throw new BufferOverflowException();
       }
       // double until hit the cap
-      long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
+      long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE);
       // but make sure there is enough if twice the existing capacity is still too small
       nextCapacity = Math.max(nextCapacity, capacityNeeded);
-      ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
-      buf.flip();
-      ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
-      buf = newBuf;
+      ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect());
+      curBuf.flip();
+      ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf);
+      curBuf = newBuf;
     }
   }
 
@@ -112,7 +116,7 @@ public class ByteBufferOutputStream extends OutputStream
   @Override
   public void write(int b) throws IOException {
     checkSizeAndGrow(Bytes.SIZEOF_BYTE);
-    buf.put((byte)b);
+    curBuf.put((byte)b);
   }
 
  /**
@@ -122,9 +126,9 @@ public class ByteBufferOutputStream extends OutputStream
   * @param      out   the output stream to which to write the data.
   * @exception  IOException  if an I/O error occurs.
   */
-  public synchronized void writeTo(OutputStream out) throws IOException {
+  public void writeTo(OutputStream out) throws IOException {
     WritableByteChannel channel = Channels.newChannel(out);
-    ByteBuffer bb = buf.duplicate();
+    ByteBuffer bb = curBuf.duplicate();
     bb.flip();
     channel.write(bb);
   }
@@ -137,12 +141,12 @@ public class ByteBufferOutputStream extends OutputStream
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
     checkSizeAndGrow(len);
-    ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
+    ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len);
   }
 
   public void write(ByteBuffer b, int off, int len) throws IOException {
     checkSizeAndGrow(len);
-    ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
+    ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len);
   }
 
   /**
@@ -153,7 +157,7 @@ public class ByteBufferOutputStream extends OutputStream
    */
   public void writeInt(int i) throws IOException {
     checkSizeAndGrow(Bytes.SIZEOF_INT);
-    ByteBufferUtils.putInt(this.buf, i);
+    ByteBufferUtils.putInt(this.curBuf, i);
   }
 
   @Override
@@ -167,7 +171,7 @@ public class ByteBufferOutputStream extends OutputStream
   }
 
   public byte[] toByteArray(int offset, int length) {
-    ByteBuffer bb = buf.duplicate();
+    ByteBuffer bb = curBuf.duplicate();
     bb.flip();
 
     byte[] chunk = new byte[length];

http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
new file mode 100644
index 0000000..e528f02
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
+ * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
+ * that it will create. When requested, if a free ByteBuffer is already present, it will return
+ * that. And when no free ByteBuffer available and we are below the max count, it will create a new
+ * one and return that.
+ *
+ * <p>
+ * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
+ * pass 'directByteBuffer' as false while construction of the pool.
+ * <p>
+ * This class is thread safe.
+ *
+ * @see ByteBufferListOutputStream
+ */
+@InterfaceAudience.Private
+public class ByteBufferPool {
+  private static final Log LOG = LogFactory.getLog(ByteBufferPool.class);
+  // TODO better config names?
+  // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
+  // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
+  public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
+  public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
+  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
+                                                          // what we will write/read to/from the
+                                                          // socket channel.
+  private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
+  private final int bufferSize;
+  private final int maxPoolSize;
+  private AtomicInteger count; // Count of the BBs created already for this pool.
+  private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
+  private boolean maxPoolSizeInfoLevelLogged = false;
+
+  /**
+   * @param bufferSize Size of each buffer created by this pool.
+   * @param maxPoolSize Max number of buffers to keep in this pool.
+   */
+  public ByteBufferPool(int bufferSize, int maxPoolSize) {
+    this(bufferSize, maxPoolSize, true);
+  }
+
+  /**
+   * @param bufferSize Size of each buffer created by this pool.
+   * @param maxPoolSize Max number of buffers to keep in this pool.
+   * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
+   */
+  public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
+    this.bufferSize = bufferSize;
+    this.maxPoolSize = maxPoolSize;
+    this.directByteBuffer = directByteBuffer;
+    // TODO can add initialPoolSize config also and make those many BBs ready for use.
+    LOG.info("Created ByteBufferPool with bufferSize : " + bufferSize + " and maxPoolSize : "
+        + maxPoolSize);
+    this.count = new AtomicInteger(0);
+  }
+
+  /**
+   * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
+   *         maximum pool size, it will create a new one and return. In case of max pool size also
+   *         reached, will return null. When pool returned a ByteBuffer, make sure to return it back
+   *         to pool after use.
+   * @see #putbackBuffer(ByteBuffer)
+   */
+  public ByteBuffer getBuffer() {
+    ByteBuffer bb = buffers.poll();
+    if (bb != null) {
+      // Clear sets limit == capacity. Position == 0.
+      bb.clear();
+      return bb;
+    }
+    while (true) {
+      int c = this.count.intValue();
+      if (c >= this.maxPoolSize) {
+        if (maxPoolSizeInfoLevelLogged) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
+                + " and no free buffers now. Consider increasing the value for '"
+                + MAX_POOL_SIZE_KEY + "' ?");
+          }
+        } else {
+          LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
+              + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
+              + "' ?");
+          maxPoolSizeInfoLevelLogged = true;
+        }
+        return null;
+      }
+      if (!this.count.compareAndSet(c, c + 1)) {
+        continue;
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
+      }
+      return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
+          : ByteBuffer.allocate(this.bufferSize);
+    }
+  }
+
+  /**
+   * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
+   * obtained from this pool.
+   * @param buf ByteBuffer to return.
+   */
+  public void putbackBuffer(ByteBuffer buf) {
+    if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
+      LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
+      return;
+    }
+    buffers.offer(buf);
+  }
+
+  int getBufferSize() {
+    return this.bufferSize;
+  }
+
+  /**
+   * @return Number of free buffers
+   */
+  @VisibleForTesting
+  int getQueueSize() {
+    return buffers.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
new file mode 100644
index 0000000..e1d1e04
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestByteBufferListOutputStream {
+
+  @Test
+  public void testWrites() throws Exception {
+    ByteBufferPool pool = new ByteBufferPool(10, 3);
+    ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+    bbos.write(2);// Write a byte
+    bbos.writeInt(100);// Write an int
+    byte[] b = Bytes.toBytes("row123");// 6 bytes
+    bbos.write(b);
+    // Just use the 3rd BB from pool so that pabos, on request, wont get one
+    ByteBuffer bb1 = pool.getBuffer();
+    ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
+    bbos.write(bb, 0, bb.capacity());
+    pool.putbackBuffer(bb1);
+    bbos.writeInt(123);
+    bbos.writeInt(124);
+    assertEquals(0, pool.getQueueSize());
+    List<ByteBuffer> allBufs = bbos.getByteBuffers();
+    assertEquals(4, allBufs.size());
+    assertEquals(3, bbos.bufsFromPool.size());
+    ByteBuffer b1 = allBufs.get(0);
+    assertEquals(10, b1.remaining());
+    assertEquals(2, b1.get());
+    assertEquals(100, b1.getInt());
+    byte[] bActual = new byte[b.length];
+    b1.get(bActual, 0, 5);//5 bytes in 1st BB
+    ByteBuffer b2 = allBufs.get(1);
+    assertEquals(10, b2.remaining());
+    b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
+    assertTrue(Bytes.equals(b, bActual));
+    bActual = new byte[bb.capacity()];
+    b2.get(bActual, 0, 9);
+    ByteBuffer b3 = allBufs.get(2);
+    assertEquals(8, b3.remaining());
+    b3.get(bActual, 9, 4);
+    assertTrue(ByteBufferUtils.equals(bb, 0, bb.capacity(), bActual, 0, bActual.length));
+    assertEquals(123, b3.getInt());
+    ByteBuffer b4 = allBufs.get(3);
+    assertEquals(4, b4.remaining());
+    assertEquals(124, b4.getInt());
+    bbos.releaseResources();
+    assertEquals(3, pool.getQueueSize());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
new file mode 100644
index 0000000..64a4103
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+@Category({ IOTests.class, SmallTests.class })
+public class TestByteBufferPool {
+
+  @Test
+  public void testOffheapBBPool() throws Exception {
+    boolean directByteBuffer = true;
+    testBBPool(10, 100, directByteBuffer);
+  }
+
+  @Test
+  public void testOnheapBBPool() throws Exception {
+    boolean directByteBuffer = false;
+    testBBPool(10, 100, directByteBuffer);
+  }
+
+  private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) {
+    ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer);
+    for (int i = 0; i < maxPoolSize; i++) {
+      ByteBuffer buffer = pool.getBuffer();
+      assertEquals(0, buffer.position());
+      assertEquals(bufferSize, buffer.limit());
+      assertEquals(directByteBuffer, buffer.isDirect());
+    }
+    assertEquals(0, pool.getQueueSize());
+    ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize)
+        : ByteBuffer.allocateDirect(bufferSize);
+    pool.putbackBuffer(bb);
+    assertEquals(0, pool.getQueueSize());
+    bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1)
+        : ByteBuffer.allocate(bufferSize + 1);
+    pool.putbackBuffer(bb);
+    assertEquals(0, pool.getQueueSize());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index babd2f8..39efa40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -46,6 +46,13 @@ class BufferChain {
     this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
   }
 
+  BufferChain(List<ByteBuffer> buffers) {
+    for (ByteBuffer b : buffers) {
+      this.remaining += b.remaining();
+    }
+    this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+  }
+
   /**
    * Expensive.  Makes a new buffer to hold a copy of what is in contained ByteBuffers.  This
    * call drains this instance; it cannot be used subsequent to the call.

http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 1087c42..c7f5a10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -85,9 +85,10 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -289,7 +290,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
   private UserProvider userProvider;
 
-  private final BoundedByteBufferPool reservoir;
+  private final ByteBufferPool reservoir;
 
   private volatile boolean allowFallbackToSimpleAuth;
 
@@ -320,7 +321,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     protected long size;                          // size of current call
     protected boolean isError;
     protected TraceInfo tinfo;
-    private ByteBuffer cellBlock = null;
+    private ByteBufferListOutputStream cellBlockStream = null;
 
     private User user;
     private InetAddress remoteAddress;
@@ -362,10 +363,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
         justification="Presume the lock on processing request held by caller is protection enough")
     void done() {
-      if (this.cellBlock != null && reservoir != null) {
-        // Return buffer to reservoir now we are done with it.
-        reservoir.putBuffer(this.cellBlock);
-        this.cellBlock = null;
+      if (this.cellBlockStream != null) {
+        this.cellBlockStream.releaseResources();// This will return back the BBs which we
+                                                // got from pool.
+        this.cellBlockStream = null;
       }
       this.connection.decRpcCount();  // Say that we're done with this call.
     }
@@ -425,38 +426,43 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         // Call id.
         headerBuilder.setCallId(this.id);
         if (t != null) {
-          ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
-          exceptionBuilder.setExceptionClassName(t.getClass().getName());
-          exceptionBuilder.setStackTrace(errorMsg);
-          exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
-          if (t instanceof RegionMovedException) {
-            // Special casing for this exception.  This is only one carrying a payload.
-            // Do this instead of build a generic system for allowing exceptions carry
-            // any kind of payload.
-            RegionMovedException rme = (RegionMovedException)t;
-            exceptionBuilder.setHostname(rme.getHostname());
-            exceptionBuilder.setPort(rme.getPort());
-          }
-          // Set the exception as the result of the method invocation.
-          headerBuilder.setException(exceptionBuilder.build());
+          setExceptionResponse(t, errorMsg, headerBuilder);
         }
         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
         // reservoir when finished. This is hacky and the hack is not contained but benefits are
         // high when we can avoid a big buffer allocation on each rpc.
-        this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
-          this.connection.compressionCodec, cells, reservoir);
-        if (this.cellBlock != null) {
+        List<ByteBuffer> cellBlock = null;
+        int cellBlockSize = 0;
+        if (reservoir != null) {
+          this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec,
+              this.connection.compressionCodec, cells, reservoir);
+          if (this.cellBlockStream != null) {
+            cellBlock = this.cellBlockStream.getByteBuffers();
+            cellBlockSize = this.cellBlockStream.size();
+          }
+        } else {
+          ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec,
+              this.connection.compressionCodec, cells);
+          if (b != null) {
+            cellBlockSize = b.remaining();
+            cellBlock = new ArrayList<ByteBuffer>(1);
+            cellBlock.add(b);
+          }
+        }
+
+        if (cellBlockSize > 0) {
           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
-          cellBlockBuilder.setLength(this.cellBlock.limit());
+          cellBlockBuilder.setLength(cellBlockSize);
           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
         }
         Message header = headerBuilder.build();
-
-        byte[] b = createHeaderAndMessageBytes(result, header);
-
-        bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
-
+        byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
+        List<ByteBuffer> responseBufs = new ArrayList<ByteBuffer>(
+            (cellBlock == null ? 1 : cellBlock.size()) + 1);
+        responseBufs.add(ByteBuffer.wrap(b));
+        if (cellBlock != null) responseBufs.addAll(cellBlock);
+        bc = new BufferChain(responseBufs);
         if (connection.useWrap) {
           bc = wrapWithSasl(bc);
         }
@@ -476,7 +482,25 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       }
     }
 
-    private byte[] createHeaderAndMessageBytes(Message result, Message header)
+    private void setExceptionResponse(Throwable t, String errorMsg,
+        ResponseHeader.Builder headerBuilder) {
+      ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+      exceptionBuilder.setExceptionClassName(t.getClass().getName());
+      exceptionBuilder.setStackTrace(errorMsg);
+      exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+      if (t instanceof RegionMovedException) {
+        // Special casing for this exception.  This is only one carrying a payload.
+        // Do this instead of build a generic system for allowing exceptions carry
+        // any kind of payload.
+        RegionMovedException rme = (RegionMovedException)t;
+        exceptionBuilder.setHostname(rme.getHostname());
+        exceptionBuilder.setPort(rme.getPort());
+      }
+      // Set the exception as the result of the method invocation.
+      headerBuilder.setException(exceptionBuilder.build());
+    }
+
+    private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
         throws IOException {
       // Organize the response as a set of bytebuffers rather than collect it all together inside
       // one big byte array; save on allocations.
@@ -493,7 +517,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       // calculate the total size
       int totalSize = headerSerializedSize + headerVintSize
           + (resultSerializedSize + resultVintSize)
-          + (this.cellBlock == null ? 0 : this.cellBlock.limit());
+          + cellBlockSize;
       // The byte[] should also hold the totalSize of the header, message and the cellblock
       byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
           + resultVintSize + Bytes.SIZEOF_INT];
@@ -1084,6 +1108,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       } finally {
         if (error) {
           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
+          // We will be closing this connection itself. Mark this call as done so that all the
+          // buffer(s) it got from pool can get released
+          call.done();
           closeConnection(call.connection);
         }
       }
@@ -1998,11 +2025,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       RpcScheduler scheduler)
       throws IOException {
     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
-      this.reservoir = new BoundedByteBufferPool(
-          conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
-          conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
-          // Make the max twice the number of handlers to be safe.
-          conf.getInt("hbase.ipc.server.reservoir.initial.max",
+      this.reservoir = new ByteBufferPool(
+          conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, ByteBufferPool.DEFAULT_BUFFER_SIZE),
+          conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
     } else {