You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/06/14 13:34:18 UTC
hbase git commit: HBASE-15525 OutOfMemory could occur when using
BoundedByteBufferPool during RPC bursts.
Repository: hbase
Updated Branches:
refs/heads/master e486d274c -> 17bcf14fe
HBASE-15525 OutOfMemory could occur when using BoundedByteBufferPool during RPC bursts.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/17bcf14f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/17bcf14f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/17bcf14f
Branch: refs/heads/master
Commit: 17bcf14fea2637fe0e5ca23bb0008c1cca208c98
Parents: e486d27
Author: anoopsjohn <an...@gmail.com>
Authored: Tue Jun 14 19:03:54 2016 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Tue Jun 14 19:03:54 2016 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 107 ++++++------
.../hbase/io/ByteBufferListOutputStream.java | 173 +++++++++++++++++++
.../hadoop/hbase/io/ByteBufferOutputStream.java | 46 ++---
.../apache/hadoop/hbase/io/ByteBufferPool.java | 154 +++++++++++++++++
.../io/TestByteBufferListOutputStream.java | 77 +++++++++
.../hadoop/hbase/io/TestByteBufferPool.java | 60 +++++++
.../apache/hadoop/hbase/ipc/BufferChain.java | 7 +
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 99 +++++++----
8 files changed, 609 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 74466b5..74f934c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -34,10 +34,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CodecPool;
@@ -90,30 +90,7 @@ public class IPCUtil {
*/
@SuppressWarnings("resource")
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
- final CellScanner cellScanner)
- throws IOException {
- return buildCellBlock(codec, compressor, cellScanner, null);
- }
-
- /**
- * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
- * <code>compressor</code>.
- * @param codec to use for encoding
- * @param compressor to use for encoding
- * @param cellScanner to encode
- * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
- * our own ByteBuffer.
- * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
- * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
- * flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not
- * null, then this returned ByteBuffer came from there and should be returned to the pool when
- * done.
- * @throws IOException if encoding the cells fail
- */
- @SuppressWarnings("resource")
- public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
- final CellScanner cellScanner, final BoundedByteBufferPool pool)
- throws IOException {
+ final CellScanner cellScanner) throws IOException {
if (cellScanner == null) {
return null;
}
@@ -121,25 +98,25 @@ public class IPCUtil {
throw new CellScannerButNoCodecException();
}
int bufferSize = this.cellBlockBuildingInitialBufferSize;
- ByteBufferOutputStream baos;
- if (pool != null) {
- ByteBuffer bb = pool.getBuffer();
- bufferSize = bb.capacity();
- baos = new ByteBufferOutputStream(bb);
- } else {
- // Then we need to make our own to return.
- if (cellScanner instanceof HeapSize) {
- long longSize = ((HeapSize)cellScanner).heapSize();
- // Just make sure we don't have a size bigger than an int.
- if (longSize > Integer.MAX_VALUE) {
- throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
- }
- bufferSize = ClassSize.align((int)longSize);
+ ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+ encodeCellsTo(baos, cellScanner, codec, compressor);
+ if (LOG.isTraceEnabled()) {
+ if (bufferSize < baos.size()) {
+ LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
+ + "; up hbase.ipc.cellblock.building.initial.buffersize?");
}
- baos = new ByteBufferOutputStream(bufferSize);
}
+ ByteBuffer bb = baos.getByteBuffer();
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ if (!bb.hasRemaining()) return null;
+ return bb;
+ }
+
+ private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
+ CompressionCodec compressor) throws IOException {
+ OutputStream os = bbos;
Compressor poolCompressor = null;
- OutputStream os = baos;
try {
if (compressor != null) {
if (compressor instanceof Configurable) {
@@ -149,33 +126,51 @@ public class IPCUtil {
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
- int count = 0;
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
- count++;
}
encoder.flush();
- // If no cells, don't mess around. Just return null (could be a bunch of existence checking
- // gets or something -- stuff that does not return a cell).
- if (count == 0) {
- return null;
- }
} catch (BufferOverflowException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
-
if (poolCompressor != null) {
CodecPool.returnCompressor(poolCompressor);
}
}
- if (LOG.isTraceEnabled()) {
- if (bufferSize < baos.size()) {
- LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
- "; up hbase.ipc.cellblock.building.initial.buffersize?");
- }
+ }
+
+ /**
+ * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+ * <code>compressor</code>.
+ * @param codec to use for encoding
+ * @param compressor to use for encoding
+ * @param cellScanner to encode
+ * @param pool Pool of ByteBuffers to make use of.
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
+ * flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not
+ * null, then this returned ByteBuffer came from there and should be returned to the pool when
+ * done.
+ * @throws IOException if encoding the cells fail
+ */
+ @SuppressWarnings("resource")
+ public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
+ CellScanner cellScanner, ByteBufferPool pool) throws IOException {
+ if (cellScanner == null) {
+ return null;
+ }
+ if (codec == null) {
+ throw new CellScannerButNoCodecException();
+ }
+ assert pool != null;
+ ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+ encodeCellsTo(bbos, cellScanner, codec, compressor);
+ if (bbos.size() == 0) {
+ bbos.releaseResources();
+ return null;
}
- return baos.getByteBuffer();
+ return bbos;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
new file mode 100644
index 0000000..b4c00c6
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when
+ * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap.
+ * Make sure to call {@link #releaseResources()} method once the Stream usage is over and
+ * data is transferred to the wanted destination.
+ * Not thread safe!
+ */
+@InterfaceAudience.Private
+public class ByteBufferListOutputStream extends ByteBufferOutputStream {
+ private static final Log LOG = LogFactory.getLog(ByteBufferListOutputStream.class);
+
+ private ByteBufferPool pool;
+ // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
+ // it is not available will make a new one our own and keep writing to that. We keep track of all
+ // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
+ // to return back all of them to pool
+ protected List<ByteBuffer> allBufs = new ArrayList<ByteBuffer>();
+ protected List<ByteBuffer> bufsFromPool = new ArrayList<ByteBuffer>();
+
+ private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
+
+ public ByteBufferListOutputStream(ByteBufferPool pool) {
+ this.pool = pool;
+ allocateNewBuffer();
+ }
+
+ private void allocateNewBuffer() {
+ if (this.curBuf != null) {
+ this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
+ }
+ // Get an initial BB to work with from the pool
+ this.curBuf = this.pool.getBuffer();
+ if (this.curBuf == null) {
+ // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
+ // heap BB on demand. It is difficult to account for all such and so proper sizing of Max
+ // direct heap size. See HBASE-15525 also for more details.
+ // Make BB with same size of pool's buffer size.
+ this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
+ } else {
+ this.bufsFromPool.add(this.curBuf);
+ }
+ this.allBufs.add(this.curBuf);
+ }
+
+ @Override
+ public int size() {
+ int s = 0;
+ for (int i = 0; i < this.allBufs.size() - 1; i++) {
+ s += this.allBufs.get(i).remaining();
+ }
+ // On the last BB, it might not be flipped yet if getByteBuffers is not yet called
+ if (this.lastBufFlipped) {
+ s += this.curBuf.remaining();
+ } else {
+ s += this.curBuf.position();
+ }
+ return s;
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer() {
+ throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer");
+ }
+
+ @Override
+ protected void checkSizeAndGrow(int extra) {
+ long capacityNeeded = curBuf.position() + (long) extra;
+ if (capacityNeeded > curBuf.limit()) {
+ allocateNewBuffer();
+ }
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ // No usage of this API in code. Just making it as an Unsupported operation as of now
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only
+ * when all the data is fully used. And it must be called at the end of usage else we will leak
+ * ByteBuffers from pool.
+ */
+ public void releaseResources() {
+ try {
+ close();
+ } catch (IOException e) {
+ LOG.debug(e);
+ }
+ // Return back all the BBs to pool
+ if (this.bufsFromPool != null) {
+ for (int i = 0; i < this.bufsFromPool.size(); i++) {
+ this.pool.putbackBuffer(this.bufsFromPool.get(i));
+ }
+ this.bufsFromPool = null;
+ }
+ this.allBufs = null;
+ this.curBuf = null;
+ }
+
+ @Override
+ public byte[] toByteArray(int offset, int length) {
+ // No usage of this API in code. Just making it as an Unsupported operation as of now
+ throw new UnsupportedOperationException();
+ }
+
+ public List<ByteBuffer> getByteBuffers() {
+ if (!this.lastBufFlipped) {
+ this.lastBufFlipped = true;
+ // All the other BBs are already flipped while moving to the new BB.
+ curBuf.flip();
+ }
+ return this.allBufs;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int toWrite = 0;
+ while (len > 0) {
+ toWrite = Math.min(len, this.curBuf.remaining());
+ ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ if (len > 0) {
+ allocateNewBuffer();// The curBuf is over. Let us move to the next one
+ }
+ }
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ int toWrite = 0;
+ while (len > 0) {
+ toWrite = Math.min(len, this.curBuf.remaining());
+ ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ if (len > 0) {
+ allocateNewBuffer();// The curBuf is over. Let us move to the next one
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index d4bda18..f77092d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -44,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- protected ByteBuffer buf;
+ protected ByteBuffer curBuf = null;
+
+ ByteBufferOutputStream() {
+
+ }
public ByteBufferOutputStream(int capacity) {
this(capacity, false);
@@ -66,12 +70,12 @@ public class ByteBufferOutputStream extends OutputStream
*/
public ByteBufferOutputStream(final ByteBuffer bb) {
assert bb.order() == ByteOrder.BIG_ENDIAN;
- this.buf = bb;
- this.buf.clear();
+ this.curBuf = bb;
+ this.curBuf.clear();
}
public int size() {
- return buf.position();
+ return curBuf.position();
}
private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
@@ -86,25 +90,25 @@ public class ByteBufferOutputStream extends OutputStream
* @return ByteBuffer
*/
public ByteBuffer getByteBuffer() {
- buf.flip();
- return buf;
+ curBuf.flip();
+ return curBuf;
}
- private void checkSizeAndGrow(int extra) {
- long capacityNeeded = buf.position() + (long) extra;
- if (capacityNeeded > buf.limit()) {
+ protected void checkSizeAndGrow(int extra) {
+ long capacityNeeded = curBuf.position() + (long) extra;
+ if (capacityNeeded > curBuf.limit()) {
// guarantee it's possible to fit
if (capacityNeeded > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
// double until hit the cap
- long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
+ long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE);
// but make sure there is enough if twice the existing capacity is still too small
nextCapacity = Math.max(nextCapacity, capacityNeeded);
- ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
- buf.flip();
- ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
- buf = newBuf;
+ ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect());
+ curBuf.flip();
+ ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf);
+ curBuf = newBuf;
}
}
@@ -112,7 +116,7 @@ public class ByteBufferOutputStream extends OutputStream
@Override
public void write(int b) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
- buf.put((byte)b);
+ curBuf.put((byte)b);
}
/**
@@ -122,9 +126,9 @@ public class ByteBufferOutputStream extends OutputStream
* @param out the output stream to which to write the data.
* @exception IOException if an I/O error occurs.
*/
- public synchronized void writeTo(OutputStream out) throws IOException {
+ public void writeTo(OutputStream out) throws IOException {
WritableByteChannel channel = Channels.newChannel(out);
- ByteBuffer bb = buf.duplicate();
+ ByteBuffer bb = curBuf.duplicate();
bb.flip();
channel.write(bb);
}
@@ -137,12 +141,12 @@ public class ByteBufferOutputStream extends OutputStream
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkSizeAndGrow(len);
- ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
+ ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len);
}
public void write(ByteBuffer b, int off, int len) throws IOException {
checkSizeAndGrow(len);
- ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
+ ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len);
}
/**
@@ -153,7 +157,7 @@ public class ByteBufferOutputStream extends OutputStream
*/
public void writeInt(int i) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_INT);
- ByteBufferUtils.putInt(this.buf, i);
+ ByteBufferUtils.putInt(this.curBuf, i);
}
@Override
@@ -167,7 +171,7 @@ public class ByteBufferOutputStream extends OutputStream
}
public byte[] toByteArray(int offset, int length) {
- ByteBuffer bb = buf.duplicate();
+ ByteBuffer bb = curBuf.duplicate();
bb.flip();
byte[] chunk = new byte[length];
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
new file mode 100644
index 0000000..e528f02
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
+ * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
+ * that it will create. When requested, if a free ByteBuffer is already present, it will return
+ * that. And when no free ByteBuffer available and we are below the max count, it will create a new
+ * one and return that.
+ *
+ * <p>
+ * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
+ * pass 'directByteBuffer' as false while construction of the pool.
+ * <p>
+ * This class is thread safe.
+ *
+ * @see ByteBufferListOutputStream
+ */
+@InterfaceAudience.Private
+public class ByteBufferPool {
+ private static final Log LOG = LogFactory.getLog(ByteBufferPool.class);
+ // TODO better config names?
+ // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
+ // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
+ public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
+ public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
+ // what we will write/read to/from the
+ // socket channel.
+ private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ private final int bufferSize;
+ private final int maxPoolSize;
+ private AtomicInteger count; // Count of the BBs created already for this pool.
+ private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
+ private boolean maxPoolSizeInfoLevelLogged = false;
+
+ /**
+ * @param bufferSize Size of each buffer created by this pool.
+ * @param maxPoolSize Max number of buffers to keep in this pool.
+ */
+ public ByteBufferPool(int bufferSize, int maxPoolSize) {
+ this(bufferSize, maxPoolSize, true);
+ }
+
+ /**
+ * @param bufferSize Size of each buffer created by this pool.
+ * @param maxPoolSize Max number of buffers to keep in this pool.
+ * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
+ */
+ public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
+ this.bufferSize = bufferSize;
+ this.maxPoolSize = maxPoolSize;
+ this.directByteBuffer = directByteBuffer;
+ // TODO can add initialPoolSize config also and make those many BBs ready for use.
+ LOG.info("Created ByteBufferPool with bufferSize : " + bufferSize + " and maxPoolSize : "
+ + maxPoolSize);
+ this.count = new AtomicInteger(0);
+ }
+
+ /**
+ * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
+ * maximum pool size, it will create a new one and return. In case of max pool size also
+ * reached, will return null. When pool returned a ByteBuffer, make sure to return it back
+ * to pool after use.
+ * @see #putbackBuffer(ByteBuffer)
+ */
+ public ByteBuffer getBuffer() {
+ ByteBuffer bb = buffers.poll();
+ if (bb != null) {
+ // Clear sets limit == capacity. Position == 0.
+ bb.clear();
+ return bb;
+ }
+ while (true) {
+ int c = this.count.intValue();
+ if (c >= this.maxPoolSize) {
+ if (maxPoolSizeInfoLevelLogged) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
+ + " and no free buffers now. Consider increasing the value for '"
+ + MAX_POOL_SIZE_KEY + "' ?");
+ }
+ } else {
+ LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
+ + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
+ + "' ?");
+ maxPoolSizeInfoLevelLogged = true;
+ }
+ return null;
+ }
+ if (!this.count.compareAndSet(c, c + 1)) {
+ continue;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
+ }
+ return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
+ : ByteBuffer.allocate(this.bufferSize);
+ }
+ }
+
+ /**
+ * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
+ * obtained from this pool.
+ * @param buf ByteBuffer to return.
+ */
+ public void putbackBuffer(ByteBuffer buf) {
+ if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
+ LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
+ return;
+ }
+ buffers.offer(buf);
+ }
+
+ int getBufferSize() {
+ return this.bufferSize;
+ }
+
+ /**
+ * @return Number of free buffers
+ */
+ @VisibleForTesting
+ int getQueueSize() {
+ return buffers.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
new file mode 100644
index 0000000..e1d1e04
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestByteBufferListOutputStream {
+
+ @Test
+ public void testWrites() throws Exception {
+ ByteBufferPool pool = new ByteBufferPool(10, 3);
+ ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+ bbos.write(2);// Write a byte
+ bbos.writeInt(100);// Write an int
+ byte[] b = Bytes.toBytes("row123");// 6 bytes
+ bbos.write(b);
+ // Just use the 3rd BB from pool so that pabos, on request, wont get one
+ ByteBuffer bb1 = pool.getBuffer();
+ ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
+ bbos.write(bb, 0, bb.capacity());
+ pool.putbackBuffer(bb1);
+ bbos.writeInt(123);
+ bbos.writeInt(124);
+ assertEquals(0, pool.getQueueSize());
+ List<ByteBuffer> allBufs = bbos.getByteBuffers();
+ assertEquals(4, allBufs.size());
+ assertEquals(3, bbos.bufsFromPool.size());
+ ByteBuffer b1 = allBufs.get(0);
+ assertEquals(10, b1.remaining());
+ assertEquals(2, b1.get());
+ assertEquals(100, b1.getInt());
+ byte[] bActual = new byte[b.length];
+ b1.get(bActual, 0, 5);//5 bytes in 1st BB
+ ByteBuffer b2 = allBufs.get(1);
+ assertEquals(10, b2.remaining());
+ b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
+ assertTrue(Bytes.equals(b, bActual));
+ bActual = new byte[bb.capacity()];
+ b2.get(bActual, 0, 9);
+ ByteBuffer b3 = allBufs.get(2);
+ assertEquals(8, b3.remaining());
+ b3.get(bActual, 9, 4);
+ assertTrue(ByteBufferUtils.equals(bb, 0, bb.capacity(), bActual, 0, bActual.length));
+ assertEquals(123, b3.getInt());
+ ByteBuffer b4 = allBufs.get(3);
+ assertEquals(4, b4.remaining());
+ assertEquals(124, b4.getInt());
+ bbos.releaseResources();
+ assertEquals(3, pool.getQueueSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
new file mode 100644
index 0000000..64a4103
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+@Category({ IOTests.class, SmallTests.class })
+public class TestByteBufferPool {
+
+ @Test
+ public void testOffheapBBPool() throws Exception {
+ boolean directByteBuffer = true;
+ testBBPool(10, 100, directByteBuffer);
+ }
+
+ @Test
+ public void testOnheapBBPool() throws Exception {
+ boolean directByteBuffer = false;
+ testBBPool(10, 100, directByteBuffer);
+ }
+
+ private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) {
+ ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer);
+ for (int i = 0; i < maxPoolSize; i++) {
+ ByteBuffer buffer = pool.getBuffer();
+ assertEquals(0, buffer.position());
+ assertEquals(bufferSize, buffer.limit());
+ assertEquals(directByteBuffer, buffer.isDirect());
+ }
+ assertEquals(0, pool.getQueueSize());
+ ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize)
+ : ByteBuffer.allocateDirect(bufferSize);
+ pool.putbackBuffer(bb);
+ assertEquals(0, pool.getQueueSize());
+ bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1)
+ : ByteBuffer.allocate(bufferSize + 1);
+ pool.putbackBuffer(bb);
+ assertEquals(0, pool.getQueueSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index babd2f8..39efa40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -46,6 +46,13 @@ class BufferChain {
this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
}
+ BufferChain(List<ByteBuffer> buffers) {
+ for (ByteBuffer b : buffers) {
+ this.remaining += b.remaining();
+ }
+ this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+ }
+
/**
* Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This
* call drains this instance; it cannot be used subsequent to the call.
http://git-wip-us.apache.org/repos/asf/hbase/blob/17bcf14f/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 1087c42..c7f5a10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -85,9 +85,10 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -289,7 +290,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private UserProvider userProvider;
- private final BoundedByteBufferPool reservoir;
+ private final ByteBufferPool reservoir;
private volatile boolean allowFallbackToSimpleAuth;
@@ -320,7 +321,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected long size; // size of current call
protected boolean isError;
protected TraceInfo tinfo;
- private ByteBuffer cellBlock = null;
+ private ByteBufferListOutputStream cellBlockStream = null;
private User user;
private InetAddress remoteAddress;
@@ -362,10 +363,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="Presume the lock on processing request held by caller is protection enough")
void done() {
- if (this.cellBlock != null && reservoir != null) {
- // Return buffer to reservoir now we are done with it.
- reservoir.putBuffer(this.cellBlock);
- this.cellBlock = null;
+ if (this.cellBlockStream != null) {
+ this.cellBlockStream.releaseResources();// This will return back the BBs which we
+ // got from pool.
+ this.cellBlockStream = null;
}
this.connection.decRpcCount(); // Say that we're done with this call.
}
@@ -425,38 +426,43 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Call id.
headerBuilder.setCallId(this.id);
if (t != null) {
- ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
- exceptionBuilder.setExceptionClassName(t.getClass().getName());
- exceptionBuilder.setStackTrace(errorMsg);
- exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
- if (t instanceof RegionMovedException) {
- // Special casing for this exception. This is only one carrying a payload.
- // Do this instead of build a generic system for allowing exceptions carry
- // any kind of payload.
- RegionMovedException rme = (RegionMovedException)t;
- exceptionBuilder.setHostname(rme.getHostname());
- exceptionBuilder.setPort(rme.getPort());
- }
- // Set the exception as the result of the method invocation.
- headerBuilder.setException(exceptionBuilder.build());
+ setExceptionResponse(t, errorMsg, headerBuilder);
}
// Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
// reservoir when finished. This is hacky and the hack is not contained but benefits are
// high when we can avoid a big buffer allocation on each rpc.
- this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
- this.connection.compressionCodec, cells, reservoir);
- if (this.cellBlock != null) {
+ List<ByteBuffer> cellBlock = null;
+ int cellBlockSize = 0;
+ if (reservoir != null) {
+ this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec,
+ this.connection.compressionCodec, cells, reservoir);
+ if (this.cellBlockStream != null) {
+ cellBlock = this.cellBlockStream.getByteBuffers();
+ cellBlockSize = this.cellBlockStream.size();
+ }
+ } else {
+ ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec,
+ this.connection.compressionCodec, cells);
+ if (b != null) {
+ cellBlockSize = b.remaining();
+ cellBlock = new ArrayList<ByteBuffer>(1);
+ cellBlock.add(b);
+ }
+ }
+
+ if (cellBlockSize > 0) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
- cellBlockBuilder.setLength(this.cellBlock.limit());
+ cellBlockBuilder.setLength(cellBlockSize);
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
-
- byte[] b = createHeaderAndMessageBytes(result, header);
-
- bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
-
+ byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
+ List<ByteBuffer> responseBufs = new ArrayList<ByteBuffer>(
+ (cellBlock == null ? 1 : cellBlock.size()) + 1);
+ responseBufs.add(ByteBuffer.wrap(b));
+ if (cellBlock != null) responseBufs.addAll(cellBlock);
+ bc = new BufferChain(responseBufs);
if (connection.useWrap) {
bc = wrapWithSasl(bc);
}
@@ -476,7 +482,25 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
- private byte[] createHeaderAndMessageBytes(Message result, Message header)
+ private void setExceptionResponse(Throwable t, String errorMsg,
+ ResponseHeader.Builder headerBuilder) {
+ ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+ exceptionBuilder.setExceptionClassName(t.getClass().getName());
+ exceptionBuilder.setStackTrace(errorMsg);
+ exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+ if (t instanceof RegionMovedException) {
+ // Special casing for this exception. This is only one carrying a payload.
+ // Do this instead of build a generic system for allowing exceptions carry
+ // any kind of payload.
+ RegionMovedException rme = (RegionMovedException)t;
+ exceptionBuilder.setHostname(rme.getHostname());
+ exceptionBuilder.setPort(rme.getPort());
+ }
+ // Set the exception as the result of the method invocation.
+ headerBuilder.setException(exceptionBuilder.build());
+ }
+
+ private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
@@ -493,7 +517,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// calculate the total size
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
- + (this.cellBlock == null ? 0 : this.cellBlock.limit());
+ + cellBlockSize;
// The byte[] should also hold the totalSize of the header, message and the cellblock
byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT];
@@ -1084,6 +1108,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} finally {
if (error) {
LOG.debug(getName() + call.toShortString() + ": output error -- closing");
+ // We will be closing this connection itself. Mark this call as done so that all the
+ // buffer(s) it got from pool can get released
+ call.done();
closeConnection(call.connection);
}
}
@@ -1998,11 +2025,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
RpcScheduler scheduler)
throws IOException {
if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
- this.reservoir = new BoundedByteBufferPool(
- conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
- conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
- // Make the max twice the number of handlers to be safe.
- conf.getInt("hbase.ipc.server.reservoir.initial.max",
+ this.reservoir = new ByteBufferPool(
+ conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, ByteBufferPool.DEFAULT_BUFFER_SIZE),
+ conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
} else {