You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/10/26 09:04:21 UTC
hbase git commit: HBASE-16783 Use ByteBufferPool for the header and
message during Rpc response (Ram)
Repository: hbase
Updated Branches:
refs/heads/master c7c45f2c8 -> 1eae9aeea
HBASE-16783 Use ByteBufferPool for the header and message during Rpc
response (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1eae9aee
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1eae9aee
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1eae9aee
Branch: refs/heads/master
Commit: 1eae9aeeac0d20f84902455f14047580f86327f2
Parents: c7c45f2
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Oct 26 14:33:49 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Oct 26 14:33:49 2016 +0530
----------------------------------------------------------------------
.../hbase/io/ByteBufferListOutputStream.java | 4 ++
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 65 +++++++++++++++-----
2 files changed, 55 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1eae9aee/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
index b4c00c6..c334a5a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -134,6 +134,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
throw new UnsupportedOperationException();
}
+ /**
+ * We can be assured that the buffers returned by this method are all flipped
+ * @return list of bytebuffers
+ */
public List<ByteBuffer> getByteBuffers() {
if (!this.lastBufFlipped) {
this.lastBufFlipped = true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1eae9aee/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 00c7254..7bcf3a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -480,7 +481,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
- byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
+ ByteBuffer headerBuf =
+ createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
@@ -489,7 +491,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} else {
responseBufs = new ByteBuffer[1];
}
- responseBufs[0] = ByteBuffer.wrap(b);
+ responseBufs[0] = headerBuf;
if (cellBlock != null) {
for (int i = 0; i < cellBlockBufferSize; i++) {
responseBufs[i + 1] = cellBlock.get(i);
@@ -533,10 +535,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
headerBuilder.setException(exceptionBuilder.build());
}
- private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
- throws IOException {
+ private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+ int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
+ // for writing the header, we check if there is available space in the buffers
+ // created for the cellblock itself. If there is space for the header, we reuse
+ // the last buffer in the cellblock. This applies to the cellblock created from the
+ // pool or even the onheap cellblock buffer in case there is no pool enabled.
+ // Possible reuse would avoid creating a temporary array for storing the header every time.
+ ByteBuffer possiblePBBuf =
+ (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
resultVintSize = 0;
if (header != null) {
@@ -551,15 +560,36 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
+ cellBlockSize;
- // The byte[] should also hold the totalSize of the header, message and the cellblock
- byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
- + resultVintSize + Bytes.SIZEOF_INT];
- // The RpcClient expects the int to be in a format that code be decoded by
- // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
- // form of writing int.
- Bytes.putInt(b, 0, totalSize);
- CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
- b.length - Bytes.SIZEOF_INT);
+ int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+ + resultVintSize + Bytes.SIZEOF_INT;
+ // Only if the last buffer has enough space for header use it. Else allocate
+ // a new buffer. Assume they are all flipped
+ if (possiblePBBuf != null
+ && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
+ // duplicate the buffer. This is where the header is going to be written
+ ByteBuffer pbBuf = possiblePBBuf.duplicate();
+ // get the current limit
+ int limit = pbBuf.limit();
+ // Position such that we write the header to the end of the buffer
+ pbBuf.position(limit);
+ // limit to the header size
+ pbBuf.limit(totalPBSize + limit);
+ // mark the current position
+ pbBuf.mark();
+ writeToCOS(result, header, totalSize, pbBuf);
+ // reset the buffer back to old position
+ pbBuf.reset();
+ return pbBuf;
+ } else {
+ return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
+ }
+ }
+
+ private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
+ throws IOException {
+ ByteBufferUtils.putInt(pbBuf, totalSize);
+ // create COS that works on BB
+ CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
if (header != null) {
cos.writeMessageNoTag(header);
}
@@ -568,7 +598,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
cos.flush();
cos.checkNoSpaceLeft();
- return b;
+ }
+
+ private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+ int totalSize, int totalPBSize) throws IOException {
+ ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
+ writeToCOS(result, header, totalSize, pbBuf);
+ pbBuf.flip();
+ return pbBuf;
}
private BufferChain wrapWithSasl(BufferChain bc)