You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/10/26 09:04:21 UTC

hbase git commit: HBASE-16783 Use ByteBufferPool for the header and message during Rpc response (Ram)

Repository: hbase
Updated Branches:
  refs/heads/master c7c45f2c8 -> 1eae9aeea


HBASE-16783 Use ByteBufferPool for the header and message during Rpc
response (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1eae9aee
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1eae9aee
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1eae9aee

Branch: refs/heads/master
Commit: 1eae9aeeac0d20f84902455f14047580f86327f2
Parents: c7c45f2
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Oct 26 14:33:49 2016 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Oct 26 14:33:49 2016 +0530

----------------------------------------------------------------------
 .../hbase/io/ByteBufferListOutputStream.java    |  4 ++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 65 +++++++++++++++-----
 2 files changed, 55 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1eae9aee/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
index b4c00c6..c334a5a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -134,6 +134,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * We can be assured that the buffers returned by this method are all flipped
+   * @return list of bytebuffers
+   */
   public List<ByteBuffer> getByteBuffers() {
     if (!this.lastBufFlipped) {
       this.lastBufFlipped = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1eae9aee/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 00c7254..7bcf3a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -480,7 +481,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
         }
         Message header = headerBuilder.build();
-        byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
+        ByteBuffer headerBuf =
+            createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock);
         ByteBuffer[] responseBufs = null;
         int cellBlockBufferSize = 0;
         if (cellBlock != null) {
@@ -489,7 +491,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         } else {
           responseBufs = new ByteBuffer[1];
         }
-        responseBufs[0] = ByteBuffer.wrap(b);
+        responseBufs[0] = headerBuf;
         if (cellBlock != null) {
           for (int i = 0; i < cellBlockBufferSize; i++) {
             responseBufs[i + 1] = cellBlock.get(i);
@@ -533,10 +535,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       headerBuilder.setException(exceptionBuilder.build());
     }
 
-    private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
-        throws IOException {
+    private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+        int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
       // Organize the response as a set of bytebuffers rather than collect it all together inside
       // one big byte array; save on allocations.
+      // for writing the header, we check if there is available space in the buffers
+      // created for the cellblock itself. If there is space for the header, we reuse
+      // the last buffer in the cellblock. This applies to the cellblock created from the
+      // pool or even the onheap cellblock buffer in case there is no pool enabled.
+      // Possible reuse would avoid creating a temporary array for storing the header every time.
+      ByteBuffer possiblePBBuf =
+          (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
       int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
           resultVintSize = 0;
       if (header != null) {
@@ -551,15 +560,36 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       int totalSize = headerSerializedSize + headerVintSize
           + (resultSerializedSize + resultVintSize)
           + cellBlockSize;
-      // The byte[] should also hold the totalSize of the header, message and the cellblock
-      byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
-          + resultVintSize + Bytes.SIZEOF_INT];
-      // The RpcClient expects the int to be in a format that code be decoded by
-      // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
-      // form of writing int.
-      Bytes.putInt(b, 0, totalSize);
-      CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
-          b.length - Bytes.SIZEOF_INT);
+      int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+          + resultVintSize + Bytes.SIZEOF_INT;
+      // Only if the last buffer has enough space for header use it. Else allocate
+      // a new buffer. Assume they are all flipped
+      if (possiblePBBuf != null
+          && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
+        // duplicate the buffer. This is where the header is going to be written
+        ByteBuffer pbBuf = possiblePBBuf.duplicate();
+        // get the current limit
+        int limit = pbBuf.limit();
+        // Position such that we write the header to the end of the buffer
+        pbBuf.position(limit);
+        // limit to the header size
+        pbBuf.limit(totalPBSize + limit);
+        // mark the current position
+        pbBuf.mark();
+        writeToCOS(result, header, totalSize, pbBuf);
+        // reset the buffer back to old position
+        pbBuf.reset();
+        return pbBuf;
+      } else {
+        return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
+      }
+    }
+
+    private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
+        throws IOException {
+      ByteBufferUtils.putInt(pbBuf, totalSize);
+      // create COS that works on BB
+      CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
       if (header != null) {
         cos.writeMessageNoTag(header);
       }
@@ -568,7 +598,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       }
       cos.flush();
       cos.checkNoSpaceLeft();
-      return b;
+    }
+
+    private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+        int totalSize, int totalPBSize) throws IOException {
+      ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
+      writeToCOS(result, header, totalSize, pbBuf);
+      pbBuf.flip();
+      return pbBuf;
     }
 
     private BufferChain wrapWithSasl(BufferChain bc)