You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/18 12:32:03 UTC
[hbase] 01/22: HBASE-21916 Abstract an ByteBuffAllocator to
allocate/free ByteBuffer in ByteBufferPool
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 568d12933b4503a91c5d3cd08b4aef4b7f9c0824
Author: huzheng <op...@gmail.com>
AuthorDate: Sat Feb 16 17:16:09 2019 +0800
HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool
---
.../apache/hadoop/hbase/ipc/CellBlockBuilder.java | 9 +-
hbase-common/pom.xml | 4 +
.../apache/hadoop/hbase/io/ByteBuffAllocator.java | 282 +++++++++++++++++++
.../hbase/io/ByteBufferListOutputStream.java | 40 ++-
.../org/apache/hadoop/hbase/io/ByteBufferPool.java | 155 -----------
.../hbase/io/encoding/CopyKeyDataBlockEncoder.java | 2 +-
.../hbase/io/encoding/DiffKeyDeltaEncoder.java | 2 +-
.../hbase/io/encoding/FastDiffDeltaEncoder.java | 2 +-
.../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 2 +-
.../hadoop/hbase/io/encoding/RowIndexSeekerV1.java | 2 +-
.../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 146 +++++-----
.../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 98 +++++--
.../java/org/apache/hadoop/hbase/nio/RefCnt.java | 49 ++++
.../apache/hadoop/hbase/nio/SingleByteBuff.java | 92 ++++--
.../apache/hadoop/hbase/util/ByteBufferArray.java | 10 +-
.../apache/hadoop/hbase/util/ByteBufferUtils.java | 31 ++-
.../hadoop/hbase/io/TestByteBuffAllocator.java | 309 +++++++++++++++++++++
.../hbase/io/TestByteBufferListOutputStream.java | 18 +-
.../apache/hadoop/hbase/io/TestByteBufferPool.java | 67 -----
.../apache/hadoop/hbase/nio/TestMultiByteBuff.java | 4 +-
.../apache/hadoop/hbase/io/hfile/Cacheable.java | 7 +-
.../hadoop/hbase/ipc/NettyRpcFrameDecoder.java | 2 +-
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 2 +-
.../apache/hadoop/hbase/ipc/NettyServerCall.java | 12 +-
.../hadoop/hbase/ipc/NettyServerRpcConnection.java | 9 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 96 +------
.../org/apache/hadoop/hbase/ipc/ServerCall.java | 18 +-
.../apache/hadoop/hbase/ipc/SimpleRpcServer.java | 2 +-
.../apache/hadoop/hbase/ipc/SimpleServerCall.java | 15 +-
.../hbase/ipc/SimpleServerRpcConnection.java | 26 +-
.../client/TestAsyncTableGetMultiThreaded.java | 4 +-
.../hbase/client/TestServerLoadDurability.java | 8 +-
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 2 +-
.../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 144 ----------
34 files changed, 974 insertions(+), 697 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index 8d68e87..111f768 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -208,7 +208,7 @@ class CellBlockBuilder {
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
- * @param pool Pool of ByteBuffers to make use of.
+ * @param allocator to allocate the {@link ByteBuff}.
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size. If
@@ -217,15 +217,14 @@ class CellBlockBuilder {
* @throws IOException if encoding the cells fail
*/
public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
- CellScanner cellScanner, ByteBufferPool pool) throws IOException {
+ CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException {
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
- assert pool != null;
- ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+ ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator);
encodeCellsTo(bbos, cellScanner, codec, compressor);
if (bbos.size() == 0) {
bbos.releaseResources();
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index 7d7dea2..c23b9d4 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -151,6 +151,10 @@
<artifactId>hbase-shaded-miscellaneous</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hbase.thirdparty</groupId>
+ <artifactId>hbase-shaded-netty</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
new file mode 100644
index 0000000..1833462
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
+ * it provide high-level interfaces for upstream. when allocating desired memory size, it will
+ * return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life
+ * cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool,
+ * otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's
+ * possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to
+ * allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an
+ * better way is increasing the ByteBufferPool size if we detected this case. <br/>
+ * <br/>
+ * On the other hand, for better memory utilization, we have set an lower bound named
+ * minSizeForReservoirUse in this allocator, and if the desired size is less than
+ * minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM
+ * free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some
+ * small objects. <br/>
+ * <br/>
+ * We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire
+ * read/write path, because it hide the details of memory management and its APIs are more friendly
+ * to the upper layer.
+ */
+@InterfaceAudience.Private
+public class ByteBuffAllocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class);
+
+ public static final String MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.allocator.max.buffer.count";
+
+ public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.allocator.buffer.size";
+ // 64 KB. Making it same as the chunk size what we will write/read to/from the socket channel.
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
+ public static final String MIN_ALLOCATE_SIZE_KEY =
+ "hbase.ipc.server.reservoir.minimal.allocating.size";
+
+ public static final Recycler NONE = () -> {
+ };
+
+ public interface Recycler {
+ void free();
+ }
+
+ private final boolean reservoirEnabled;
+ private final int bufSize;
+ private final int maxBufCount;
+ private final AtomicInteger usedBufCount = new AtomicInteger(0);
+
+ private boolean maxPoolSizeInfoLevelLogged = false;
+
+ // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll
+ // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size.
+ private final int minSizeForReservoirUse;
+
+ private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if
+ * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just
+ * allocate the insufficient buffers from on-heap to meet the requirement.
+ * @param conf which get the arguments to initialize the allocator.
+ * @param reservoirEnabled indicate whether the reservoir is enabled or disabled.
+ * @return ByteBuffAllocator to manage the byte buffers.
+ */
+ public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) {
+ int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
+ if (reservoirEnabled) {
+ // The max number of buffers to be pooled in the ByteBufferPool. The default value been
+ // selected based on the #handlers configured. When it is read request, 2 MB is the max size
+ // at which we will send back one RPC request. Means max we need 2 MB for creating the
+ // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
+ // include the heap size overhead of each cells also.) Considering 2 MB, we will need
+ // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
+ // is by default 64 KB.
+ // In case of read request, at the end of the handler process, we will make the response
+ // cellblock and add the Call to connection's response Q and a single Responder thread takes
+ // connections and responses from that one by one and do the socket write. So there is chances
+ // that by the time a handler originated response is actually done writing to socket and so
+ // released the BBs it used, the handler might have processed one more read req. On an avg 2x
+ // we consider and consider that also for the max buffers to pool
+ int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
+ int maxBuffCount =
+ conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
+ int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
+ return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
+ } else {
+ return new ByteBuffAllocator(false, 0, poolBufSize, Integer.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's
+ * designed for testing purpose or disabled reservoir case.
+ * @return allocator to allocate on-heap ByteBuffer.
+ */
+ public static ByteBuffAllocator createOnHeap() {
+ return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
+ }
+
+ @VisibleForTesting
+ ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
+ int minSizeForReservoirUse) {
+ this.reservoirEnabled = reservoirEnabled;
+ this.maxBufCount = maxBufCount;
+ this.bufSize = bufSize;
+ this.minSizeForReservoirUse = minSizeForReservoirUse;
+ }
+
+ public boolean isReservoirEnabled() {
+ return reservoirEnabled;
+ }
+
+ @VisibleForTesting
+ public int getQueueSize() {
+ return this.buffers.size();
+ }
+
+ /**
+ * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the
+ * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO
+ * ByteBuffer pool.
+ * @return an ByteBuff with the buffer size.
+ */
+ public SingleByteBuff allocateOneBuffer() {
+ if (isReservoirEnabled()) {
+ ByteBuffer bb = getBuffer();
+ if (bb != null) {
+ return new SingleByteBuff(() -> putbackBuffer(bb), bb);
+ }
+ }
+ // Allocated from heap, let the JVM free its memory.
+ return new SingleByteBuff(NONE, ByteBuffer.allocate(this.bufSize));
+ }
+
+ /**
+ * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if
+ * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool.
+ * @param size to allocate
+ * @return an ByteBuff with the desired size.
+ */
+ public ByteBuff allocate(int size) {
+ if (size < 0) {
+ throw new IllegalArgumentException("size to allocate should >=0");
+ }
+ // If disabled the reservoir, just allocate it from on-heap.
+ if (!isReservoirEnabled() || size == 0) {
+ return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
+ }
+ int reminder = size % bufSize;
+ int len = size / bufSize + (reminder > 0 ? 1 : 0);
+ List<ByteBuffer> bbs = new ArrayList<>(len);
+ // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or
+ // reservoir is exhausted.
+ int remain = size;
+ while (remain >= minSizeForReservoirUse) {
+ ByteBuffer bb = this.getBuffer();
+ if (bb == null) {
+ break;
+ }
+ bbs.add(bb);
+ remain -= bufSize;
+ }
+ int lenFromReservoir = bbs.size();
+ if (remain > 0) {
+ // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we
+ // just allocate the ByteBuffer from on-heap.
+ bbs.add(ByteBuffer.allocate(remain));
+ }
+ ByteBuff bb = wrap(bbs, () -> {
+ for (int i = 0; i < lenFromReservoir; i++) {
+ this.putbackBuffer(bbs.get(i));
+ }
+ });
+ bb.limit(size);
+ return bb;
+ }
+
+ public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
+ if (buffers == null || buffers.length == 0) {
+ throw new IllegalArgumentException("buffers shouldn't be null or empty");
+ }
+ return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
+ : new MultiByteBuff(recycler, buffers);
+ }
+
+ public static ByteBuff wrap(ByteBuffer[] buffers) {
+ return wrap(buffers, NONE);
+ }
+
+ public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
+ if (buffers == null || buffers.size() == 0) {
+ throw new IllegalArgumentException("buffers shouldn't be null or empty");
+ }
+ return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
+ : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
+ }
+
+ public static ByteBuff wrap(List<ByteBuffer> buffers) {
+ return wrap(buffers, NONE);
+ }
+
+ /**
+ * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
+ * the maximum pool size, it will create a new one and return. In case of max pool size
+ * also reached, will return null. When pool returned a ByteBuffer, make sure to return it
+ * back to pool after use.
+ */
+ private ByteBuffer getBuffer() {
+ ByteBuffer bb = buffers.poll();
+ if (bb != null) {
+ // To reset the limit to capacity and position to 0, must clear here.
+ bb.clear();
+ return bb;
+ }
+ while (true) {
+ int c = this.usedBufCount.intValue();
+ if (c >= this.maxBufCount) {
+ if (!maxPoolSizeInfoLevelLogged) {
+ LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider "
+ + "increasing the value for '{}' ?",
+ maxBufCount, MAX_BUFFER_COUNT_KEY);
+ maxPoolSizeInfoLevelLogged = true;
+ }
+ return null;
+ }
+ if (!this.usedBufCount.compareAndSet(c, c + 1)) {
+ continue;
+ }
+ return ByteBuffer.allocateDirect(bufSize);
+ }
+ }
+
+ /**
+ * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
+ * @param buf ByteBuffer to return.
+ */
+ private void putbackBuffer(ByteBuffer buf) {
+ if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
+ LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
+ return;
+ }
+ buffers.offer(buf);
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
index 0b97abb..e8bd322 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -39,18 +41,17 @@ import org.slf4j.LoggerFactory;
public class ByteBufferListOutputStream extends ByteBufferOutputStream {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
- private ByteBufferPool pool;
+ private ByteBuffAllocator allocator;
// Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
// it is not available will make a new one our own and keep writing to that. We keep track of all
// the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
// to return back all of them to pool
- protected List<ByteBuffer> allBufs = new ArrayList<>();
- protected List<ByteBuffer> bufsFromPool = new ArrayList<>();
+ protected List<SingleByteBuff> allBufs = new ArrayList<>();
private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
- public ByteBufferListOutputStream(ByteBufferPool pool) {
- this.pool = pool;
+ public ByteBufferListOutputStream(ByteBuffAllocator allocator) {
+ this.allocator = allocator;
allocateNewBuffer();
}
@@ -58,18 +59,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
if (this.curBuf != null) {
this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
}
- // Get an initial BB to work with from the pool
- this.curBuf = this.pool.getBuffer();
- if (this.curBuf == null) {
- // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
- // heap BB on demand. It is difficult to account for all such and so proper sizing of Max
- // direct heap size. See HBASE-15525 also for more details.
- // Make BB with same size of pool's buffer size.
- this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
- } else {
- this.bufsFromPool.add(this.curBuf);
- }
- this.allBufs.add(this.curBuf);
+ // Get an initial ByteBuffer from the allocator.
+ SingleByteBuff sbb = allocator.allocateOneBuffer();
+ this.curBuf = sbb.nioByteBuffers()[0];
+ this.allBufs.add(sbb);
}
@Override
@@ -118,11 +111,8 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
LOG.debug(e.toString(), e);
}
// Return back all the BBs to pool
- if (this.bufsFromPool != null) {
- for (int i = 0; i < this.bufsFromPool.size(); i++) {
- this.pool.putbackBuffer(this.bufsFromPool.get(i));
- }
- this.bufsFromPool = null;
+ for (ByteBuff buf : this.allBufs) {
+ buf.release();
}
this.allBufs = null;
this.curBuf = null;
@@ -144,7 +134,11 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
// All the other BBs are already flipped while moving to the new BB.
curBuf.flip();
}
- return this.allBufs;
+ List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size());
+ for (SingleByteBuff bb : this.allBufs) {
+ bbs.add(bb.nioByteBuffers()[0]);
+ }
+ return bbs;
}
@Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
deleted file mode 100644
index caca20b..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
- * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
- * that it will create. When requested, if a free ByteBuffer is already present, it will return
- * that. And when no free ByteBuffer available and we are below the max count, it will create a new
- * one and return that.
- *
- * <p>
- * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled,
- * pass 'directByteBuffer' as false while construction of the pool.
- * <p>
- * This class is thread safe.
- *
- * @see ByteBufferListOutputStream
- */
-@InterfaceAudience.Private
-public class ByteBufferPool {
- private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class);
- // TODO better config names?
- // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
- // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
- public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
- public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
- public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
- // what we will write/read to/from the
- // socket channel.
- private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
-
- private final int bufferSize;
- private final int maxPoolSize;
- private AtomicInteger count; // Count of the BBs created already for this pool.
- private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers
- private boolean maxPoolSizeInfoLevelLogged = false;
-
- /**
- * @param bufferSize Size of each buffer created by this pool.
- * @param maxPoolSize Max number of buffers to keep in this pool.
- */
- public ByteBufferPool(int bufferSize, int maxPoolSize) {
- this(bufferSize, maxPoolSize, true);
- }
-
- /**
- * @param bufferSize Size of each buffer created by this pool.
- * @param maxPoolSize Max number of buffers to keep in this pool.
- * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer.
- */
- public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) {
- this.bufferSize = bufferSize;
- this.maxPoolSize = maxPoolSize;
- this.directByteBuffer = directByteBuffer;
- // TODO can add initialPoolSize config also and make those many BBs ready for use.
- LOG.info("Created with bufferSize={} and maxPoolSize={}",
- org.apache.hadoop.util.StringUtils.byteDesc(bufferSize),
- org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize));
- this.count = new AtomicInteger(0);
- }
-
- /**
- * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
- * maximum pool size, it will create a new one and return. In case of max pool size also
- * reached, will return null. When pool returned a ByteBuffer, make sure to return it back
- * to pool after use.
- * @see #putbackBuffer(ByteBuffer)
- */
- public ByteBuffer getBuffer() {
- ByteBuffer bb = buffers.poll();
- if (bb != null) {
- // Clear sets limit == capacity. Position == 0.
- bb.clear();
- return bb;
- }
- while (true) {
- int c = this.count.intValue();
- if (c >= this.maxPoolSize) {
- if (maxPoolSizeInfoLevelLogged) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
- + " and no free buffers now. Consider increasing the value for '"
- + MAX_POOL_SIZE_KEY + "' ?");
- }
- } else {
- LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
- + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
- + "' ?");
- maxPoolSizeInfoLevelLogged = true;
- }
- return null;
- }
- if (!this.count.compareAndSet(c, c + 1)) {
- continue;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
- }
- return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize)
- : ByteBuffer.allocate(this.bufferSize);
- }
- }
-
- /**
- * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
- * obtained from this pool.
- * @param buf ByteBuffer to return.
- */
- public void putbackBuffer(ByteBuffer buf) {
- if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) {
- LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
- return;
- }
- buffers.offer(buf);
- }
-
- public int getBufferSize() {
- return this.bufferSize;
- }
-
- /**
- * @return Number of free buffers
- */
- @VisibleForTesting
- public int getQueueSize() {
- return buffers.size();
- }
-}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 8bc7974..d7ab009 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -98,7 +98,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
currentBuffer.skip(current.tagsLength);
}
if (includesMvcc()) {
- current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index 01f0a9d..ab93d19 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -477,7 +477,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags();
}
if (includesMvcc()) {
- current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index baa1856..aa9a436 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -501,7 +501,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags();
}
if (includesMvcc()) {
- current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 63da7e7..176bea3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -213,7 +213,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
decodeTags();
}
if (includesMvcc()) {
- current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
index 14d847c..9c0532e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
@@ -282,7 +282,7 @@ public class RowIndexSeekerV1 extends AbstractEncodedSeeker {
decodeTags();
}
if (includesMvcc()) {
- current.memstoreTS = ByteBuff.readVLong(currentBuffer);
+ current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 68cf56e..1ee3607 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -24,22 +24,81 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil;
+
+
/**
- * An abstract class that abstracts out as to how the byte buffers are used,
- * either single or multiple. We have this interface because the java's ByteBuffers
- * cannot be sub-classed. This class provides APIs similar to the ones provided
- * in java's nio ByteBuffers and allows you to do positional reads/writes and relative
- * reads and writes on the underlying BB. In addition to it, we have some additional APIs which
- * helps us in the read path.
+ * An abstract class that abstracts out as to how the byte buffers are used, either single or
+ * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class
+ * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do
+ * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we
+ * have some additional APIs which helps us in the read path. <br/>
+ * The ByteBuff implement {@link ReferenceCounted} interface which mean need to maintains a
+ * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a
+ * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the
+ * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or
+ * the original one will free its memory, because they share the same NIO ByteBuffers. when you want
+ * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can
+ * do like this:
+ *
+ * <pre>
+ * ByteBuff original = ...;
+ * ByteBuff dup = original.duplicate();
+ * dup.retain();
+ * original.release();
+ * // The NIO buffers can still be accessed unless you release the duplicated one
+ * dup.get(...);
+ * dup.release();
+ * // Both the original and dup can not access the NIO buffers any more.
+ * </pre>
*/
@InterfaceAudience.Private
-// TODO to have another name. This can easily get confused with netty's ByteBuf
-public abstract class ByteBuff {
+public abstract class ByteBuff implements ReferenceCounted {
+ private static final String REFERENCE_COUNT_NAME = "ReferenceCount";
private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
+ protected RefCnt refCnt;
+
+ /*************************** Methods for reference count **********************************/
+
+ protected void checkRefCount() {
+ ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME);
+ }
+
+ public int refCnt() {
+ return refCnt.refCnt();
+ }
+
+ @Override
+ public boolean release() {
+ return refCnt.release();
+ }
+
+ @Override
+ public final ByteBuff retain(int increment) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final boolean release(int increment) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final ByteBuff touch() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final ByteBuff touch(Object hint) {
+ throw new UnsupportedOperationException();
+ }
+
+ /******************************* Methods for ByteBuff **************************************/
+
/**
* @return this ByteBuff's current position
*/
@@ -491,78 +550,11 @@ public abstract class ByteBuff {
return tmpLength;
}
- /**
- * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
- * {@link ByteBuff}.
- */
- public static long readVLong(ByteBuff in) {
- byte firstByte = in.get();
- int len = WritableUtils.decodeVIntSize(firstByte);
- if (len == 1) {
- return firstByte;
- }
- long i = 0;
- for (int idx = 0; idx < len-1; idx++) {
- byte b = in.get();
- i = i << 8;
- i = i | (b & 0xFF);
- }
- return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
- }
-
- /**
- * Search sorted array "a" for byte "key".
- *
- * @param a Array to search. Entries must be sorted and unique.
- * @param fromIndex First index inclusive of "a" to include in the search.
- * @param toIndex Last index exclusive of "a" to include in the search.
- * @param key The byte to search for.
- * @return The index of key if found. If not found, return -(index + 1), where
- * negative indicates "not found" and the "index + 1" handles the "-0"
- * case.
- */
- public static int unsignedBinarySearch(ByteBuff a, int fromIndex, int toIndex, byte key) {
- int unsignedKey = key & 0xff;
- int low = fromIndex;
- int high = toIndex - 1;
-
- while (low <= high) {
- int mid = low + ((high - low) >> 1);
- int midVal = a.get(mid) & 0xff;
-
- if (midVal < unsignedKey) {
- low = mid + 1;
- } else if (midVal > unsignedKey) {
- high = mid - 1;
- } else {
- return mid; // key found
- }
- }
- return -(low + 1); // key not found.
- }
+ public abstract ByteBuffer[] nioByteBuffers();
@Override
public String toString() {
return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
", cap= " + capacity() + "]";
}
-
- public static String toStringBinary(final ByteBuff b, int off, int len) {
- StringBuilder result = new StringBuilder();
- // Just in case we are passed a 'len' that is > buffer length...
- if (off >= b.capacity())
- return result.toString();
- if (off + len > b.capacity())
- len = b.capacity() - off;
- for (int i = off; i < off + len; ++i) {
- int ch = b.get(i) & 0xFF;
- if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z')
- || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
- result.append((char) ch);
- } else {
- result.append(String.format("\\x%02X", ch));
- }
- }
- return result.toString();
- }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 97f5141..e9eadc7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.nio;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
+
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
@@ -24,13 +26,12 @@ import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.nio.channels.ReadableByteChannel;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
/**
* Provides a unified view of all the underlying ByteBuffers and will look as if a bigger
* sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int,
@@ -53,6 +54,15 @@ public class MultiByteBuff extends ByteBuff {
private final int[] itemBeginPos;
public MultiByteBuff(ByteBuffer... items) {
+ this(NONE, items);
+ }
+
+ public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
+ this(new RefCnt(recycler), items);
+ }
+
+ private MultiByteBuff(RefCnt refCnt, ByteBuffer... items) {
+ this.refCnt = refCnt;
assert items != null;
assert items.length > 0;
this.items = items;
@@ -75,8 +85,9 @@ public class MultiByteBuff extends ByteBuff {
this.limitedItemIndex = this.items.length - 1;
}
- private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex,
- int curItemIndex, int markedIndex) {
+ private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit,
+ int limitedIndex, int curItemIndex, int markedIndex) {
+ this.refCnt = refCnt;
this.items = items;
this.curItemIndex = curItemIndex;
this.curItem = this.items[this.curItemIndex];
@@ -117,6 +128,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int capacity() {
+ checkRefCount();
int c = 0;
for (ByteBuffer item : this.items) {
c += item.capacity();
@@ -131,12 +143,14 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public byte get(int index) {
+ checkRefCount();
int itemIndex = getItemIndex(index);
return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]);
}
@Override
public byte getByteAfterPosition(int offset) {
+ checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex = getItemIndexFromCurItemIndex(index);
@@ -179,6 +193,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int getInt(int index) {
+ checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index
@@ -192,6 +207,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public int getIntAfterPosition(int offset) {
+ checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex;
@@ -210,6 +226,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public short getShort(int index) {
+ checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index
@@ -238,6 +255,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public short getShortAfterPosition(int offset) {
+ checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex;
@@ -319,6 +337,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public long getLong(int index) {
+ checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int itemIndex;
if (this.itemBeginPos[this.curItemIndex] <= index
@@ -332,6 +351,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public long getLongAfterPosition(int offset) {
+ checkRefCount();
// Mostly the index specified will land within this current item. Short circuit for that
int index = offset + this.position();
int itemIndex;
@@ -348,6 +368,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int position() {
+ checkRefCount();
return itemBeginPos[this.curItemIndex] + this.curItem.position();
}
@@ -358,6 +379,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff position(int position) {
+ checkRefCount();
// Short circuit for positioning within the cur item. Mostly that is the case.
if (this.itemBeginPos[this.curItemIndex] <= position
&& this.itemBeginPos[this.curItemIndex + 1] > position) {
@@ -385,6 +407,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff rewind() {
+ checkRefCount();
for (int i = 0; i < this.items.length; i++) {
this.items[i].rewind();
}
@@ -400,6 +423,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff mark() {
+ checkRefCount();
this.markedItemIndex = this.curItemIndex;
this.curItem.mark();
return this;
@@ -412,6 +436,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff reset() {
+ checkRefCount();
// when the buffer is moved to the next one.. the reset should happen on the previous marked
// item and the new one should be taken as the base
if (this.markedItemIndex < 0) throw new InvalidMarkException();
@@ -433,6 +458,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int remaining() {
+ checkRefCount();
int remain = 0;
for (int i = curItemIndex; i < items.length; i++) {
remain += items[i].remaining();
@@ -446,6 +472,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public final boolean hasRemaining() {
+ checkRefCount();
return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex
&& this.items[this.curItemIndex + 1].hasRemaining());
}
@@ -457,6 +484,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public byte get() {
+ checkRefCount();
if (this.curItem.remaining() == 0) {
if (items.length - 1 == this.curItemIndex) {
// means cur item is the last one and we wont be able to read a long. Throw exception
@@ -476,6 +504,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public short getShort() {
+ checkRefCount();
int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_SHORT) {
return this.curItem.getShort();
@@ -494,6 +523,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public int getInt() {
+ checkRefCount();
int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_INT) {
return this.curItem.getInt();
@@ -514,6 +544,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public long getLong() {
+ checkRefCount();
int remaining = this.curItem.remaining();
if (remaining >= Bytes.SIZEOF_LONG) {
return this.curItem.getLong();
@@ -545,6 +576,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public void get(byte[] dst, int offset, int length) {
+ checkRefCount();
while (length > 0) {
int toRead = Math.min(length, this.curItem.remaining());
ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset,
@@ -560,6 +592,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public void get(int sourceOffset, byte[] dst, int offset, int length) {
+ checkRefCount();
int itemIndex = getItemIndex(sourceOffset);
ByteBuffer item = this.items[itemIndex];
sourceOffset = sourceOffset - this.itemBeginPos[itemIndex];
@@ -583,6 +616,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff limit(int limit) {
+ checkRefCount();
this.limit = limit;
// Normally the limit will try to limit within the last BB item
int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex];
@@ -622,29 +656,30 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff slice() {
+ checkRefCount();
ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1];
for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) {
copy[j] = this.items[i].slice();
}
- return new MultiByteBuff(copy);
+ return new MultiByteBuff(refCnt, copy);
}
/**
- * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark
- * of the new MBB will be independent than that of the original MBB.
- * The content of the new MBB will start at this MBB's current position
- * The position, limit and mark of the new MBB would be identical to this MBB in terms of
- * values.
- * @return a sliced MBB
+ * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the
+ * new MBB will be independent than that of the original MBB. The content of the new MBB will
+ * start at this MBB's current position The position, limit and mark of the new MBB would be
+ * identical to this MBB in terms of values.
+ * @return a duplicated MBB
*/
@Override
public MultiByteBuff duplicate() {
+ checkRefCount();
ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length];
for (int i = 0; i < this.items.length; i++) {
itemsCopy[i] = items[i].duplicate();
}
- return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex,
- this.curItemIndex, this.markedItemIndex);
+ return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit,
+ this.limitedItemIndex, this.curItemIndex, this.markedItemIndex);
}
/**
@@ -654,6 +689,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(byte b) {
+ checkRefCount();
if (this.curItem.remaining() == 0) {
if (this.curItemIndex == this.items.length - 1) {
throw new BufferOverflowException();
@@ -673,6 +709,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(int index, byte b) {
+ checkRefCount();
int itemIndex = getItemIndex(limit);
ByteBuffer item = items[itemIndex];
item.put(index - itemBeginPos[itemIndex], b);
@@ -688,6 +725,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+ checkRefCount();
int destItemIndex = getItemIndex(offset);
int srcItemIndex = getItemIndex(srcOffset);
ByteBuffer destItem = this.items[destItemIndex];
@@ -723,7 +761,7 @@ public class MultiByteBuff extends ByteBuff {
}
private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) {
- return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer()
+ return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0]
: ((MultiByteBuff) buf).items[index];
}
@@ -734,6 +772,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff putInt(int val) {
+ checkRefCount();
if (this.curItem.remaining() >= Bytes.SIZEOF_INT) {
this.curItem.putInt(val);
return this;
@@ -784,6 +823,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff put(byte[] src, int offset, int length) {
+ checkRefCount();
if (this.curItem.remaining() >= length) {
ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length);
return this;
@@ -803,6 +843,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff putLong(long val) {
+ checkRefCount();
if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) {
this.curItem.putLong(val);
return this;
@@ -860,6 +901,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff skip(int length) {
+ checkRefCount();
// Get available bytes from this item and remaining from next
int jump = 0;
while (true) {
@@ -882,6 +924,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public MultiByteBuff moveBack(int length) {
+ checkRefCount();
while (length != 0) {
if (length > curItem.position()) {
length -= curItem.position();
@@ -909,6 +952,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public ByteBuffer asSubByteBuffer(int length) {
+ checkRefCount();
if (this.curItem.remaining() >= length) {
return this.curItem;
}
@@ -918,8 +962,8 @@ public class MultiByteBuff extends ByteBuff {
ByteBuffer locCurItem = curItem;
while (length > 0) {
int toRead = Math.min(length, locCurItem.remaining());
- ByteBufferUtils
- .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead);
+ ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset,
+ toRead);
length -= toRead;
if (length == 0) break;
locCurItemIndex++;
@@ -945,6 +989,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
+ checkRefCount();
if (this.itemBeginPos[this.curItemIndex] <= offset) {
int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex];
if (this.curItem.limit() - relOffsetInCurItem >= length) {
@@ -988,6 +1033,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public void get(ByteBuffer out, int sourceOffset,
int length) {
+ checkRefCount();
// Not used from real read path actually. So not going with
// optimization
for (int i = 0; i < length; ++i) {
@@ -1007,6 +1053,7 @@ public class MultiByteBuff extends ByteBuff {
*/
@Override
public byte[] toBytes(int offset, int length) {
+ checkRefCount();
byte[] output = new byte[length];
this.get(offset, output, 0, length);
return output;
@@ -1014,6 +1061,7 @@ public class MultiByteBuff extends ByteBuff {
@Override
public int read(ReadableByteChannel channel) throws IOException {
+ checkRefCount();
int total = 0;
while (true) {
// Read max possible into the current BB
@@ -1034,13 +1082,19 @@ public class MultiByteBuff extends ByteBuff {
}
@Override
+ public ByteBuffer[] nioByteBuffers() {
+ checkRefCount();
+ return this.items;
+ }
+
+ @Override
public boolean equals(Object obj) {
if (!(obj instanceof MultiByteBuff)) return false;
if (this == obj) return true;
MultiByteBuff that = (MultiByteBuff) obj;
if (this.capacity() != that.capacity()) return false;
if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(),
- that.limit()) == 0) {
+ that.limit()) == 0) {
return true;
}
return false;
@@ -1055,11 +1109,9 @@ public class MultiByteBuff extends ByteBuff {
return hash;
}
- /**
- * @return the ByteBuffers which this wraps.
- */
- @VisibleForTesting
- public ByteBuffer[] getEnclosingByteBuffers() {
- return this.items;
+ @Override
+ public MultiByteBuff retain() {
+ refCnt.retain();
+ return this;
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
new file mode 100644
index 0000000..80172b2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
+
+/**
+ * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
+ * reference count become 0, it'll call {@link Recycler#free()} once.
+ */
+@InterfaceAudience.Private
+class RefCnt extends AbstractReferenceCounted {
+
+ private Recycler recycler = ByteBuffAllocator.NONE;
+
+ RefCnt(Recycler recycler) {
+ this.recycler = recycler;
+ }
+
+ @Override
+ protected final void deallocate() {
+ this.recycler.free();
+ }
+
+ @Override
+ public final ReferenceCounted touch(Object hint) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 6d64d7b..7205251 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,22 +17,24 @@
*/
package org.apache.hadoop.hbase.nio;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
import org.apache.yetus.audience.InterfaceAudience;
-import sun.nio.ch.DirectBuffer;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import sun.nio.ch.DirectBuffer;
/**
- * An implementation of ByteBuff where a single BB backs the BBI. This just acts
- * as a wrapper over a normal BB - offheap or onheap
+ * An implementation of ByteBuff where a single BB backs the BBI. This just acts as a wrapper over a
+ * normal BB - offheap or onheap
*/
@InterfaceAudience.Private
public class SingleByteBuff extends ByteBuff {
@@ -48,6 +50,15 @@ public class SingleByteBuff extends ByteBuff {
private Object unsafeRef = null;
public SingleByteBuff(ByteBuffer buf) {
+ this(NONE, buf);
+ }
+
+ public SingleByteBuff(Recycler recycler, ByteBuffer buf) {
+ this(new RefCnt(recycler), buf);
+ }
+
+ private SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
+ this.refCnt = refCnt;
this.buf = buf;
if (buf.hasArray()) {
this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset();
@@ -59,63 +70,74 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int position() {
+ checkRefCount();
return this.buf.position();
}
@Override
public SingleByteBuff position(int position) {
+ checkRefCount();
this.buf.position(position);
return this;
}
@Override
public SingleByteBuff skip(int len) {
+ checkRefCount();
this.buf.position(this.buf.position() + len);
return this;
}
@Override
public SingleByteBuff moveBack(int len) {
+ checkRefCount();
this.buf.position(this.buf.position() - len);
return this;
}
@Override
public int capacity() {
+ checkRefCount();
return this.buf.capacity();
}
@Override
public int limit() {
+ checkRefCount();
return this.buf.limit();
}
@Override
public SingleByteBuff limit(int limit) {
+ checkRefCount();
this.buf.limit(limit);
return this;
}
@Override
public SingleByteBuff rewind() {
+ checkRefCount();
this.buf.rewind();
return this;
}
@Override
public SingleByteBuff mark() {
+ checkRefCount();
this.buf.mark();
return this;
}
@Override
public ByteBuffer asSubByteBuffer(int length) {
+ checkRefCount();
// Just return the single BB that is available
return this.buf;
}
@Override
public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) {
+ checkRefCount();
// Just return the single BB that is available
pair.setFirst(this.buf);
pair.setSecond(offset);
@@ -123,37 +145,44 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int remaining() {
+ checkRefCount();
return this.buf.remaining();
}
@Override
public boolean hasRemaining() {
+ checkRefCount();
return buf.hasRemaining();
}
@Override
public SingleByteBuff reset() {
+ checkRefCount();
this.buf.reset();
return this;
}
@Override
public SingleByteBuff slice() {
- return new SingleByteBuff(this.buf.slice());
+ checkRefCount();
+ return new SingleByteBuff(this.refCnt, this.buf.slice());
}
@Override
public SingleByteBuff duplicate() {
- return new SingleByteBuff(this.buf.duplicate());
+ checkRefCount();
+ return new SingleByteBuff(this.refCnt, this.buf.duplicate());
}
@Override
public byte get() {
+ checkRefCount();
return buf.get();
}
@Override
public byte get(int index) {
+ checkRefCount();
if (UNSAFE_AVAIL) {
return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index);
}
@@ -162,29 +191,34 @@ public class SingleByteBuff extends ByteBuff {
@Override
public byte getByteAfterPosition(int offset) {
+ checkRefCount();
return get(this.buf.position() + offset);
}
@Override
public SingleByteBuff put(byte b) {
+ checkRefCount();
this.buf.put(b);
return this;
}
@Override
public SingleByteBuff put(int index, byte b) {
+ checkRefCount();
buf.put(index, b);
return this;
}
@Override
public void get(byte[] dst, int offset, int length) {
+ checkRefCount();
ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length);
buf.position(buf.position() + length);
}
@Override
public void get(int sourceOffset, byte[] dst, int offset, int length) {
+ checkRefCount();
ByteBufferUtils.copyFromBufferToArray(dst, buf, sourceOffset, offset, length);
}
@@ -195,9 +229,10 @@ public class SingleByteBuff extends ByteBuff {
@Override
public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) {
+ checkRefCount();
if (src instanceof SingleByteBuff) {
ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset,
- offset, length);
+ offset, length);
} else {
// TODO we can do some optimization here? Call to asSubByteBuffer might
// create a copy.
@@ -205,7 +240,7 @@ public class SingleByteBuff extends ByteBuff {
src.asSubByteBuffer(srcOffset, length, pair);
if (pair.getFirst() != null) {
ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset,
- length);
+ length);
}
}
return this;
@@ -213,37 +248,44 @@ public class SingleByteBuff extends ByteBuff {
@Override
public SingleByteBuff put(byte[] src, int offset, int length) {
+ checkRefCount();
ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length);
return this;
}
@Override
public SingleByteBuff put(byte[] src) {
+ checkRefCount();
return put(src, 0, src.length);
}
@Override
public boolean hasArray() {
+ checkRefCount();
return this.buf.hasArray();
}
@Override
public byte[] array() {
+ checkRefCount();
return this.buf.array();
}
@Override
public int arrayOffset() {
+ checkRefCount();
return this.buf.arrayOffset();
}
@Override
public short getShort() {
+ checkRefCount();
return this.buf.getShort();
}
@Override
public short getShort(int index) {
+ checkRefCount();
if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index);
}
@@ -252,22 +294,26 @@ public class SingleByteBuff extends ByteBuff {
@Override
public short getShortAfterPosition(int offset) {
+ checkRefCount();
return getShort(this.buf.position() + offset);
}
@Override
public int getInt() {
+ checkRefCount();
return this.buf.getInt();
}
@Override
public SingleByteBuff putInt(int value) {
+ checkRefCount();
ByteBufferUtils.putInt(this.buf, value);
return this;
}
@Override
public int getInt(int index) {
+ checkRefCount();
if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index);
}
@@ -276,22 +322,26 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int getIntAfterPosition(int offset) {
+ checkRefCount();
return getInt(this.buf.position() + offset);
}
@Override
public long getLong() {
+ checkRefCount();
return this.buf.getLong();
}
@Override
public SingleByteBuff putLong(long value) {
+ checkRefCount();
ByteBufferUtils.putLong(this.buf, value);
return this;
}
@Override
public long getLong(int index) {
+ checkRefCount();
if (UNSAFE_UNALIGNED) {
return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index);
}
@@ -300,11 +350,13 @@ public class SingleByteBuff extends ByteBuff {
@Override
public long getLongAfterPosition(int offset) {
+ checkRefCount();
return getLong(this.buf.position() + offset);
}
@Override
public byte[] toBytes(int offset, int length) {
+ checkRefCount();
byte[] output = new byte[length];
ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length);
return output;
@@ -312,18 +364,28 @@ public class SingleByteBuff extends ByteBuff {
@Override
public void get(ByteBuffer out, int sourceOffset, int length) {
+ checkRefCount();
ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length);
}
@Override
public int read(ReadableByteChannel channel) throws IOException {
+ checkRefCount();
return channelRead(channel, buf);
}
@Override
+ public ByteBuffer[] nioByteBuffers() {
+ checkRefCount();
+ return new ByteBuffer[] { this.buf };
+ }
+
+ @Override
public boolean equals(Object obj) {
- if(!(obj instanceof SingleByteBuff)) return false;
- return this.buf.equals(((SingleByteBuff)obj).buf);
+ if (!(obj instanceof SingleByteBuff)) {
+ return false;
+ }
+ return this.buf.equals(((SingleByteBuff) obj).buf);
}
@Override
@@ -331,11 +393,9 @@ public class SingleByteBuff extends ByteBuff {
return this.buf.hashCode();
}
- /**
- * @return the ByteBuffer which this wraps.
- */
- @VisibleForTesting
- public ByteBuffer getEnclosingByteBuffer() {
- return this.buf;
+ @Override
+ public SingleByteBuff retain() {
+ refCnt.retain();
+ return this;
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index 2e14b13..d023339 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -27,9 +27,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -311,10 +311,6 @@ public class ByteBufferArray {
srcIndex += cnt;
}
assert srcIndex == len;
- if (mbb.length > 1) {
- return new MultiByteBuff(mbb);
- } else {
- return new SingleByteBuff(mbb[0]);
- }
+ return ByteBuffAllocator.wrap(mbb);
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 3ea0a5c..98bc88a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -348,25 +349,39 @@ public final class ByteBufferUtils {
}
}
- /**
- * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
- * {@link ByteBuffer}.
- */
- public static long readVLong(ByteBuffer in) {
- byte firstByte = in.get();
+ private interface ByteVisitor {
+ byte get();
+ }
+
+ private static long readVLong(ByteVisitor visitor) {
+ byte firstByte = visitor.get();
int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) {
return firstByte;
}
long i = 0;
- for (int idx = 0; idx < len-1; idx++) {
- byte b = in.get();
+ for (int idx = 0; idx < len - 1; idx++) {
+ byte b = visitor.get();
i = i << 8;
i = i | (b & 0xFF);
}
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
+ /**
+ * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a {@link ByteBuffer}.
+ */
+ public static long readVLong(ByteBuffer in) {
+ return readVLong(in::get);
+ }
+
+ /**
+ * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a
+ * {@link ByteBuff}.
+ */
+ public static long readVLong(ByteBuff in) {
+ return readVLong(in::get);
+ }
/**
* Put in buffer integer using 7 bit encoding. For each written byte:
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
new file mode 100644
index 0000000..0976c11
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestByteBuffAllocator {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestByteBuffAllocator.class);
+
+ @Test
+ public void testAllocateByteBuffToReadInto() {
+ int maxBuffersInPool = 10;
+ int bufSize = 6 * 1024;
+ ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, bufSize / 6);
+ ByteBuff buff = alloc.allocate(10 * bufSize);
+ buff.release();
+ // When the request size is less than 1/6th of the pool buffer size. We should use on demand
+ // created on heap Buffer
+ buff = alloc.allocate(200);
+ assertTrue(buff.hasArray());
+ assertEquals(maxBuffersInPool, alloc.getQueueSize());
+ buff.release();
+ // When the request size is > 1/6th of the pool buffer size.
+ buff = alloc.allocate(1024);
+ assertFalse(buff.hasArray());
+ assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
+ buff.release();// ByteBuffDeallocaor#free should put back the BB to pool.
+ assertEquals(maxBuffersInPool, alloc.getQueueSize());
+ // Request size> pool buffer size
+ buff = alloc.allocate(7 * 1024);
+ assertFalse(buff.hasArray());
+ assertTrue(buff instanceof MultiByteBuff);
+ ByteBuffer[] bbs = buff.nioByteBuffers();
+ assertEquals(2, bbs.length);
+ assertTrue(bbs[0].isDirect());
+ assertTrue(bbs[1].isDirect());
+ assertEquals(6 * 1024, bbs[0].limit());
+ assertEquals(1024, bbs[1].limit());
+ assertEquals(maxBuffersInPool - 2, alloc.getQueueSize());
+ buff.release();
+ assertEquals(maxBuffersInPool, alloc.getQueueSize());
+
+ buff = alloc.allocate(6 * 1024 + 200);
+ assertFalse(buff.hasArray());
+ assertTrue(buff instanceof MultiByteBuff);
+ bbs = buff.nioByteBuffers();
+ assertEquals(2, bbs.length);
+ assertTrue(bbs[0].isDirect());
+ assertFalse(bbs[1].isDirect());
+ assertEquals(6 * 1024, bbs[0].limit());
+ assertEquals(200, bbs[1].limit());
+ assertEquals(maxBuffersInPool - 1, alloc.getQueueSize());
+ buff.release();
+ assertEquals(maxBuffersInPool, alloc.getQueueSize());
+
+ alloc.allocate(bufSize * (maxBuffersInPool - 1));
+ buff = alloc.allocate(20 * 1024);
+ assertFalse(buff.hasArray());
+ assertTrue(buff instanceof MultiByteBuff);
+ bbs = buff.nioByteBuffers();
+ assertEquals(2, bbs.length);
+ assertTrue(bbs[0].isDirect());
+ assertFalse(bbs[1].isDirect());
+ assertEquals(6 * 1024, bbs[0].limit());
+ assertEquals(14 * 1024, bbs[1].limit());
+ assertEquals(0, alloc.getQueueSize());
+ buff.release();
+ assertEquals(1, alloc.getQueueSize());
+ alloc.allocateOneBuffer();
+
+ buff = alloc.allocate(7 * 1024);
+ assertTrue(buff.hasArray());
+ assertTrue(buff instanceof SingleByteBuff);
+ assertEquals(7 * 1024, buff.nioByteBuffers()[0].limit());
+ buff.release();
+ }
+
+ @Test
+ public void testNegativeAllocatedSize() {
+ int maxBuffersInPool = 10;
+ ByteBuffAllocator allocator =
+ new ByteBuffAllocator(true, maxBuffersInPool, 6 * 1024, 1024);
+ try {
+ allocator.allocate(-1);
+ fail("Should throw exception when size < 0");
+ } catch (IllegalArgumentException e) {
+ // expected exception
+ }
+ ByteBuff bb = allocator.allocate(0);
+ bb.release();
+ }
+
+ @Test
+ public void testAllocateOneBuffer() {
+ // Allocate from on-heap
+ ByteBuffAllocator allocator = ByteBuffAllocator.createOnHeap();
+ ByteBuff buf = allocator.allocateOneBuffer();
+ assertTrue(buf.hasArray());
+ assertEquals(ByteBuffAllocator.DEFAULT_BUFFER_SIZE, buf.remaining());
+ buf.release();
+
+ // Allocate from off-heap
+ int bufSize = 10;
+ allocator = new ByteBuffAllocator(true, 1, 10, 3);
+ buf = allocator.allocateOneBuffer();
+ assertFalse(buf.hasArray());
+ assertEquals(buf.remaining(), bufSize);
+ // The another one will be allocated from on-heap because the pool has only one ByteBuffer,
+ // and still not be cleaned.
+ ByteBuff buf2 = allocator.allocateOneBuffer();
+ assertTrue(buf2.hasArray());
+ assertEquals(buf2.remaining(), bufSize);
+ // free the first one
+ buf.release();
+ // The next one will be off-heap again.
+ buf = allocator.allocateOneBuffer();
+ assertFalse(buf.hasArray());
+ assertEquals(buf.remaining(), bufSize);
+ buf.release();
+ }
+
+ @Test
+ public void testReferenceCount() {
+ int bufSize = 64;
+ ByteBuffAllocator alloc = new ByteBuffAllocator(true, 2, bufSize, 3);
+ ByteBuff buf1 = alloc.allocate(bufSize * 2);
+ assertFalse(buf1.hasArray());
+ // The next one will be allocated from heap
+ ByteBuff buf2 = alloc.allocateOneBuffer();
+ assertTrue(buf2.hasArray());
+
+ // duplicate the buf2, if the dup released, buf2 will also be released (SingleByteBuffer)
+ ByteBuff dup2 = buf2.duplicate();
+ dup2.release();
+ assertEquals(0, buf2.refCnt());
+ assertEquals(0, dup2.refCnt());
+ assertEquals(0, alloc.getQueueSize());
+ assertException(dup2::position);
+ assertException(buf2::position);
+
+ // duplicate the buf1, if the dup1 released, buf1 will also be released (MultipleByteBuffer)
+ ByteBuff dup1 = buf1.duplicate();
+ dup1.release();
+ assertEquals(0, buf1.refCnt());
+ assertEquals(0, dup1.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+ assertException(dup1::position);
+ assertException(buf1::position);
+
+ // slice the buf3, if the slice3 released, buf3 will also be released (SingleByteBuffer)
+ ByteBuff buf3 = alloc.allocateOneBuffer();
+ assertFalse(buf3.hasArray());
+ ByteBuff slice3 = buf3.slice();
+ slice3.release();
+ assertEquals(0, buf3.refCnt());
+ assertEquals(0, slice3.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+
+ // slice the buf4, if the slice4 released, buf4 will also be released (MultipleByteBuffer)
+ ByteBuff buf4 = alloc.allocate(bufSize * 2);
+ assertFalse(buf4.hasArray());
+ ByteBuff slice4 = buf4.slice();
+ slice4.release();
+ assertEquals(0, buf4.refCnt());
+ assertEquals(0, slice4.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+
+ // Test multiple reference for the same ByteBuff (SingleByteBuff)
+ ByteBuff buf5 = alloc.allocateOneBuffer();
+ ByteBuff slice5 = buf5.duplicate().duplicate().duplicate().slice().slice();
+ slice5.release();
+ assertEquals(0, buf5.refCnt());
+ assertEquals(0, slice5.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+ assertException(slice5::position);
+ assertException(buf5::position);
+
+ // Test multiple reference for the same ByteBuff (SingleByteBuff)
+ ByteBuff buf6 = alloc.allocate(bufSize >> 2);
+ ByteBuff slice6 = buf6.duplicate().duplicate().duplicate().slice().slice();
+ slice6.release();
+ assertEquals(0, buf6.refCnt());
+ assertEquals(0, slice6.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+
+ // Test retain the parent SingleByteBuff (duplicate)
+ ByteBuff parent = alloc.allocateOneBuffer();
+ ByteBuff child = parent.duplicate();
+ child.retain();
+ parent.release();
+ assertEquals(1, child.refCnt());
+ assertEquals(1, parent.refCnt());
+ assertEquals(1, alloc.getQueueSize());
+ parent.release();
+ assertEquals(0, child.refCnt());
+ assertEquals(0, parent.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+
+ // Test retain parent MultiByteBuff (duplicate)
+ parent = alloc.allocate(bufSize << 1);
+ child = parent.duplicate();
+ child.retain();
+ parent.release();
+ assertEquals(1, child.refCnt());
+ assertEquals(1, parent.refCnt());
+ assertEquals(0, alloc.getQueueSize());
+ parent.release();
+ assertEquals(0, child.refCnt());
+ assertEquals(0, parent.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+
+ // Test retain the parent SingleByteBuff (slice)
+ parent = alloc.allocateOneBuffer();
+ child = parent.slice();
+ child.retain();
+ parent.release();
+ assertEquals(1, child.refCnt());
+ assertEquals(1, parent.refCnt());
+ assertEquals(1, alloc.getQueueSize());
+ parent.release();
+ assertEquals(0, child.refCnt());
+ assertEquals(0, parent.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+
+ // Test retain parent MultiByteBuff (slice)
+ parent = alloc.allocate(bufSize << 1);
+ child = parent.slice();
+ child.retain();
+ parent.release();
+ assertEquals(1, child.refCnt());
+ assertEquals(1, parent.refCnt());
+ assertEquals(0, alloc.getQueueSize());
+ parent.release();
+ assertEquals(0, child.refCnt());
+ assertEquals(0, parent.refCnt());
+ assertEquals(2, alloc.getQueueSize());
+ }
+
+ @Test
+ public void testReverseRef() {
+ int bufSize = 64;
+ ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
+ ByteBuff buf1 = alloc.allocate(bufSize);
+ ByteBuff dup1 = buf1.duplicate();
+ assertEquals(1, buf1.refCnt());
+ assertEquals(1, dup1.refCnt());
+ buf1.release();
+ assertEquals(0, buf1.refCnt());
+ assertEquals(0, dup1.refCnt());
+ assertEquals(1, alloc.getQueueSize());
+ assertException(buf1::position);
+ assertException(dup1::position);
+ }
+
+ @Test
+ public void testByteBuffUnsupportedMethods() {
+ int bufSize = 64;
+ ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3);
+ ByteBuff buf = alloc.allocate(bufSize);
+ assertException(() -> buf.retain(2));
+ assertException(() -> buf.release(2));
+ assertException(() -> buf.touch());
+ assertException(() -> buf.touch(new Object()));
+ }
+
+ private void assertException(Runnable r) {
+ try {
+ r.run();
+ fail();
+ } catch (Exception e) {
+ // expected exception.
+ }
+ }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
index 2f7a869..3ac7a75 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -40,29 +41,30 @@ public class TestByteBufferListOutputStream {
@Test
public void testWrites() throws Exception {
- ByteBufferPool pool = new ByteBufferPool(10, 3);
- ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+ ByteBuffAllocator alloc = new ByteBuffAllocator(true, 3, 10, 10 / 6);
+ ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(alloc);
bbos.write(2);// Write a byte
bbos.writeInt(100);// Write an int
byte[] b = Bytes.toBytes("row123");// 6 bytes
bbos.write(b);
+ assertEquals(2, bbos.allBufs.size());
// Just use the 3rd BB from pool so that pabos, on request, wont get one
- ByteBuffer bb1 = pool.getBuffer();
+ ByteBuff bb1 = alloc.allocateOneBuffer();
ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
bbos.write(bb, 0, bb.capacity());
- pool.putbackBuffer(bb1);
+ bb1.release();
bbos.writeInt(123);
bbos.writeInt(124);
- assertEquals(0, pool.getQueueSize());
+ assertEquals(0, alloc.getQueueSize());
List<ByteBuffer> allBufs = bbos.getByteBuffers();
assertEquals(4, allBufs.size());
- assertEquals(3, bbos.bufsFromPool.size());
+ assertEquals(4, bbos.allBufs.size());
ByteBuffer b1 = allBufs.get(0);
assertEquals(10, b1.remaining());
assertEquals(2, b1.get());
assertEquals(100, b1.getInt());
byte[] bActual = new byte[b.length];
- b1.get(bActual, 0, 5);//5 bytes in 1st BB
+ b1.get(bActual, 0, 5);// 5 bytes in 1st BB
ByteBuffer b2 = allBufs.get(1);
assertEquals(10, b2.remaining());
b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
@@ -78,6 +80,6 @@ public class TestByteBufferListOutputStream {
assertEquals(4, b4.remaining());
assertEquals(124, b4.getInt());
bbos.releaseResources();
- assertEquals(3, pool.getQueueSize());
+ assertEquals(3, alloc.getQueueSize());
}
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
deleted file mode 100644
index 44d2f45..0000000
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ IOTests.class, SmallTests.class })
-public class TestByteBufferPool {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestByteBufferPool.class);
-
- @Test
- public void testOffheapBBPool() throws Exception {
- boolean directByteBuffer = true;
- testBBPool(10, 100, directByteBuffer);
- }
-
- @Test
- public void testOnheapBBPool() throws Exception {
- boolean directByteBuffer = false;
- testBBPool(10, 100, directByteBuffer);
- }
-
- private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) {
- ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer);
- for (int i = 0; i < maxPoolSize; i++) {
- ByteBuffer buffer = pool.getBuffer();
- assertEquals(0, buffer.position());
- assertEquals(bufferSize, buffer.limit());
- assertEquals(directByteBuffer, buffer.isDirect());
- }
- assertEquals(0, pool.getQueueSize());
- ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize)
- : ByteBuffer.allocateDirect(bufferSize);
- pool.putbackBuffer(bb);
- assertEquals(0, pool.getQueueSize());
- bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1)
- : ByteBuffer.allocate(bufferSize + 1);
- pool.putbackBuffer(bb);
- assertEquals(0, pool.getQueueSize());
- }
-}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
index 84cf7a4..fcfb77a 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
@@ -286,12 +286,12 @@ public class TestMultiByteBuff {
multi.putInt(45);
multi.position(1);
multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG));
- MultiByteBuff sliced = multi.slice();
+ ByteBuff sliced = multi.slice();
assertEquals(0, sliced.position());
assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit());
assertEquals(l1, sliced.getLong());
assertEquals(l2, sliced.getLong());
- MultiByteBuff dup = multi.duplicate();
+ ByteBuff dup = multi.duplicate();
assertEquals(1, dup.position());
assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit());
assertEquals(l1, dup.getLong());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
index 0224dea..a842967 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
@@ -69,11 +69,10 @@ public interface Cacheable extends HeapSize {
/**
* SHARED means when this Cacheable is read back from cache it refers to the same memory area as
- * used by the cache for caching it.
- * EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an
- * exclusive memory area of this Cacheable.
+ * used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache,
+ * the data was copied to an exclusive memory area of this Cacheable.
*/
- public static enum MemoryType {
+ enum MemoryType {
SHARED, EXCLUSIVE
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
index 80b1288..5ed3d2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
@@ -127,7 +127,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
NettyServerCall reqTooBig =
new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
connection, 0, connection.addr, System.currentTimeMillis(), 0,
- connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null);
+ connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 742a728..bba1bed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -187,7 +187,7 @@ public class NettyRpcServer extends RpcServer {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException {
NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
- -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null);
+ -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
return call(fakeCall, status);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index 2fae311..8dc08c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -21,9 +21,9 @@ import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -39,10 +39,10 @@ class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
- InetAddress remoteAddress, long receiveTime, int timeout,
- ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
- super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
- receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+ InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
+ CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
+ super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
+ timeout, bbAllocator, cellBlockBuilder, reqCleanup);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
index ffa16bf..2f97f53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
@@ -59,12 +59,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
void process(final ByteBuf buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
- this.callCleanup = new RpcServer.CallCleanup() {
- @Override
- public void run() {
- buf.release();
- }
- };
+ this.callCleanup = buf::release;
process(new SingleByteBuff(buf.nioBuffer()));
} else {
ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes());
@@ -121,7 +116,7 @@ class NettyServerRpcConnection extends ServerRpcConnection {
long size, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
return new NettyServerCall(id, service, md, header, param, cellScanner, this, size,
- remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+ remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
this.rpcServer.cellBlockBuilder, reqCleanup);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 3ab63dd..ac8c26c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -38,16 +37,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -210,11 +205,7 @@ public abstract class RpcServer implements RpcServerInterface,
protected UserProvider userProvider;
- protected final ByteBufferPool reservoir;
- // The requests and response will use buffers from ByteBufferPool, when the size of the
- // request/response is at least this size.
- // We make this to be 1/6th of the pool buffer size.
- protected final int minSizeForReservoirUse;
+ protected final ByteBuffAllocator bbAllocator;
protected volatile boolean allowFallbackToSimpleAuth;
@@ -225,7 +216,7 @@ public abstract class RpcServer implements RpcServerInterface,
private RSRpcServices rsRpcServices;
@FunctionalInterface
- protected static interface CallCleanup {
+ protected interface CallCleanup {
void run();
}
@@ -266,32 +257,7 @@ public abstract class RpcServer implements RpcServerInterface,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
- if (reservoirEnabled) {
- int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY,
- ByteBufferPool.DEFAULT_BUFFER_SIZE);
- // The max number of buffers to be pooled in the ByteBufferPool. The default value been
- // selected based on the #handlers configured. When it is read request, 2 MB is the max size
- // at which we will send back one RPC request. Means max we need 2 MB for creating the
- // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we
- // include the heap size overhead of each cells also.) Considering 2 MB, we will need
- // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size
- // is by default 64 KB.
- // In case of read request, at the end of the handler process, we will make the response
- // cellblock and add the Call to connection's response Q and a single Responder thread takes
- // connections and responses from that one by one and do the socket write. So there is chances
- // that by the time a handler originated response is actually done writing to socket and so
- // released the BBs it used, the handler might have processed one more read req. On an avg 2x
- // we consider and consider that also for the max buffers to pool
- int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize;
- int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
- conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
- HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
- this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize);
- this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir);
- } else {
- reservoir = null;
- this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
- }
+ this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled);
this.server = server;
this.services = services;
this.bindAddress = bindAddress;
@@ -325,11 +291,6 @@ public abstract class RpcServer implements RpcServerInterface,
this.scheduler = scheduler;
}
- @VisibleForTesting
- static int getMinSizeForReservoirUse(ByteBufferPool pool) {
- return pool.getBufferSize() / 6;
- }
-
@Override
public void onConfigurationChange(Configuration newConf) {
initReconfigurable(newConf);
@@ -652,55 +613,6 @@ public abstract class RpcServer implements RpcServerInterface,
}
/**
- * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
- * as much as possible.
- *
- * @param pool The ByteBufferPool to use
- * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
- * need of size below this, create on heap ByteBuffer.
- * @param reqLen Bytes count in request
- */
- @VisibleForTesting
- static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool,
- int minSizeForPoolUse, int reqLen) {
- ByteBuff resultBuf;
- List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1);
- int remain = reqLen;
- ByteBuffer buf = null;
- while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
- bbs.add(buf);
- remain -= pool.getBufferSize();
- }
- ByteBuffer[] bufsFromPool = null;
- if (bbs.size() > 0) {
- bufsFromPool = new ByteBuffer[bbs.size()];
- bbs.toArray(bufsFromPool);
- }
- if (remain > 0) {
- bbs.add(ByteBuffer.allocate(remain));
- }
- if (bbs.size() > 1) {
- ByteBuffer[] items = new ByteBuffer[bbs.size()];
- bbs.toArray(items);
- resultBuf = new MultiByteBuff(items);
- } else {
- // We are backed by single BB
- resultBuf = new SingleByteBuff(bbs.get(0));
- }
- resultBuf.limit(reqLen);
- if (bufsFromPool != null) {
- final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
- return new Pair<>(resultBuf, () -> {
- // Return back all the BBs to pool
- for (int i = 0; i < bufsFromPoolFinal.length; i++) {
- pool.putbackBuffer(bufsFromPoolFinal[i]);
- }
- });
- }
- return new Pair<>(resultBuf, null);
- }
-
- /**
* Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing
* call.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index cf1cf9a..f93f3a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -26,10 +26,10 @@ import java.util.Optional;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.security.User;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -67,7 +67,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
protected long startTime;
protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
- protected final ByteBufferPool reservoir;
+ protected final ByteBuffAllocator bbAllocator;
protected final CellBlockBuilder cellBlockBuilder;
@@ -91,11 +91,11 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
- justification="Can't figure why this complaint is happening... see below")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
+ justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
- Message param, CellScanner cellScanner, T connection, long size,
- InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
+ Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
+ long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
this.id = id;
this.service = service;
@@ -118,7 +118,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
this.remoteAddress = remoteAddress;
this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
- this.reservoir = reservoir;
+ this.bbAllocator = byteBuffAllocator;
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
}
@@ -199,9 +199,9 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
// high when we can avoid a big buffer allocation on each rpc.
List<ByteBuffer> cellBlock = null;
int cellBlockSize = 0;
- if (this.reservoir != null) {
+ if (bbAllocator.isReservoirEnabled()) {
this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
- this.connection.compressionCodec, cells, this.reservoir);
+ this.connection.compressionCodec, cells, bbAllocator);
if (this.cellBlockStream != null) {
cellBlock = this.cellBlockStream.getByteBuffers();
cellBlockSize = this.cellBlockStream.size();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index 2a8cfbe..f3f7807 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -488,7 +488,7 @@ public class SimpleRpcServer extends RpcServer {
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException {
SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
- null, -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
+ null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null);
return call(fakeCall, status);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
index 6084138..311b4c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -21,9 +21,9 @@ import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -42,11 +42,12 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
justification = "Can't figure why this complaint is happening... see below")
SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
- SimpleServerRpcConnection connection, long size,
- final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
- CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) {
- super(id, service, md, header, param, cellScanner, connection, size, remoteAddress,
- receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+ SimpleServerRpcConnection connection, long size, final InetAddress remoteAddress,
+ long receiveTime, int timeout, ByteBuffAllocator bbAllocator,
+ CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
+ SimpleRpcServerResponder responder) {
+ super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime,
+ timeout, bbAllocator, cellBlockBuilder, reqCleanup);
this.responder = responder;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index b4b5f33..01127cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -36,14 +36,12 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Pair;
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
@@ -212,7 +210,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
// Notify the client about the offending request
SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0,
- this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder);
+ this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
@@ -255,24 +253,8 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
// It creates the ByteBuff and CallCleanup and assign to Connection instance.
private void initByteBuffToReadInto(int length) {
- // We create random on heap buffers are read into those when
- // 1. ByteBufferPool is not there.
- // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
- // waste then. Also if all the reqs are of this size, we will be creating larger sized
- // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
- // RegionOpen.
- // 3. If it is an initial handshake signal or initial connection request. Any way then
- // condition 2 itself will match
- // 4. When SASL use is ON.
- if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead ||
- useSasl || length < this.rpcServer.minSizeForReservoirUse) {
- this.data = new SingleByteBuff(ByteBuffer.allocate(length));
- } else {
- Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto(
- this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length);
- this.data = pair.getFirst();
- this.callCleanup = pair.getSecond();
- }
+ this.data = rpcServer.bbAllocator.allocate(length);
+ this.callCleanup = data::release;
}
protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
@@ -345,7 +327,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
RequestHeader header, Message param, CellScanner cellScanner, long size,
InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
- remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+ remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator,
this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index 75800ba..8a993b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
import static org.junit.Assert.assertEquals;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -94,7 +94,7 @@ public class TestAsyncTableGetMultiThreaded {
protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception {
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
- TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+ TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(memoryCompaction));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
index 267e9e8..abf20dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
@@ -71,9 +72,8 @@ public class TestServerLoadDurability {
private static Configuration createConfigurationForSimpleRpcServer() {
Configuration conf = HBaseConfiguration.create();
- conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
- SimpleRpcServer.class.getName());
- conf.setInt(ByteBufferPool.BUFFER_SIZE_KEY, 20);
+ conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName());
+ conf.setInt(BUFFER_SIZE_KEY, 20);
return conf;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 48080b2..32160a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -832,7 +832,7 @@ public class TestHFileBlock {
if (ClassSize.is32BitJVM()) {
assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
} else {
- assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
+ assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
}
for (int size : new int[] { 100, 256, 12345 }) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
deleted file mode 100644
index 560190b..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RPCTests.class, SmallTests.class })
-public class TestRpcServer {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRpcServer.class);
-
- @Test
- public void testAllocateByteBuffToReadInto() throws Exception {
- int maxBuffersInPool = 10;
- ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool);
- initPoolWithAllBuffers(pool, maxBuffersInPool);
- ByteBuff buff = null;
- Pair<ByteBuff, CallCleanup> pair;
- // When the request size is less than 1/6th of the pool buffer size. We should use on demand
- // created on heap Buffer
- pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
- 200);
- buff = pair.getFirst();
- assertTrue(buff.hasArray());
- assertEquals(maxBuffersInPool, pool.getQueueSize());
- assertNull(pair.getSecond());
- // When the request size is > 1/6th of the pool buffer size.
- pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
- 1024);
- buff = pair.getFirst();
- assertFalse(buff.hasArray());
- assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
- assertNotNull(pair.getSecond());
- pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
- assertEquals(maxBuffersInPool, pool.getQueueSize());
- // Request size> pool buffer size
- pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
- 7 * 1024);
- buff = pair.getFirst();
- assertFalse(buff.hasArray());
- assertTrue(buff instanceof MultiByteBuff);
- ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
- assertEquals(2, bbs.length);
- assertTrue(bbs[0].isDirect());
- assertTrue(bbs[1].isDirect());
- assertEquals(6 * 1024, bbs[0].limit());
- assertEquals(1024, bbs[1].limit());
- assertEquals(maxBuffersInPool - 2, pool.getQueueSize());
- assertNotNull(pair.getSecond());
- pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
- assertEquals(maxBuffersInPool, pool.getQueueSize());
-
- pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
- 6 * 1024 + 200);
- buff = pair.getFirst();
- assertFalse(buff.hasArray());
- assertTrue(buff instanceof MultiByteBuff);
- bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
- assertEquals(2, bbs.length);
- assertTrue(bbs[0].isDirect());
- assertFalse(bbs[1].isDirect());
- assertEquals(6 * 1024, bbs[0].limit());
- assertEquals(200, bbs[1].limit());
- assertEquals(maxBuffersInPool - 1, pool.getQueueSize());
- assertNotNull(pair.getSecond());
- pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
- assertEquals(maxBuffersInPool, pool.getQueueSize());
-
- ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1];
- for (int i = 0; i < maxBuffersInPool - 1; i++) {
- buffers[i] = pool.getBuffer();
- }
- pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
- 20 * 1024);
- buff = pair.getFirst();
- assertFalse(buff.hasArray());
- assertTrue(buff instanceof MultiByteBuff);
- bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers();
- assertEquals(2, bbs.length);
- assertTrue(bbs[0].isDirect());
- assertFalse(bbs[1].isDirect());
- assertEquals(6 * 1024, bbs[0].limit());
- assertEquals(14 * 1024, bbs[1].limit());
- assertEquals(0, pool.getQueueSize());
- assertNotNull(pair.getSecond());
- pair.getSecond().run();// CallCleanup#run should put back the BB to pool.
- assertEquals(1, pool.getQueueSize());
- pool.getBuffer();
- pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool),
- 7 * 1024);
- buff = pair.getFirst();
- assertTrue(buff.hasArray());
- assertTrue(buff instanceof SingleByteBuff);
- assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit());
- assertNull(pair.getSecond());
- }
-
- private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
- ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
- // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back
- // all. Makes pool with max #buffers.
- for (int i = 0; i < maxBuffersInPool; i++) {
- buffers[i] = pool.getBuffer();
- }
- for (ByteBuffer buf : buffers) {
- pool.putbackBuffer(buf);
- }
- }
-}