You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/02/04 18:56:24 UTC

hbase git commit: HBASE-15202 Reduce garbage while setting response (Ram)

Repository: hbase
Updated Branches:
  refs/heads/master f5fba2ba0 -> 7b33a740b


HBASE-15202 Reduce garbage while setting response (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b33a740
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b33a740
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b33a740

Branch: refs/heads/master
Commit: 7b33a740b10b05b50f8e9d3b2a1ef37593cb6eb3
Parents: f5fba2b
Author: ramkrishna <ra...@gmail.com>
Authored: Thu Feb 4 23:23:31 2016 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Thu Feb 4 23:23:31 2016 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 51 +++++++++++++++++---
 1 file changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7b33a740/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index a9c64a3..98669e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -131,6 +131,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
@@ -427,14 +428,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
         Message header = headerBuilder.build();
 
-        // Organize the response as a set of bytebuffers rather than collect it all together inside
-        // one big byte array; save on allocations.
-        ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
-        ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
-        int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
-          (this.cellBlock == null? 0: this.cellBlock.limit());
-        ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
-        bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
+        byte[] b = createHeaderAndMessageBytes(result, header);
+
+        bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
+
         if (connection.useWrap) {
           bc = wrapWithSasl(bc);
         }
@@ -454,6 +451,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       }
     }
 
+    private byte[] createHeaderAndMessageBytes(Message result, Message header)
+        throws IOException {
+      // Organize the response as a set of bytebuffers rather than collect it all together inside
+      // one big byte array; save on allocations.
+      int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
+          resultVintSize = 0;
+      if (header != null) {
+        headerSerializedSize = header.getSerializedSize();
+        headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
+      }
+      if (result != null) {
+        resultSerializedSize = result.getSerializedSize();
+        resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
+      }
+      // calculate the total size
+      int totalSize = headerSerializedSize + headerVintSize
+          + (resultSerializedSize + resultVintSize)
+          + (this.cellBlock == null ? 0 : this.cellBlock.limit());
+      // The byte[] should also hold the totalSize of the header, message and the cellblock
+      byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
+          + resultVintSize + Bytes.SIZEOF_INT];
+      // The RpcClient expects the int to be in a format that code be decoded by
+      // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int)
+      // form of writing int.
+      Bytes.putInt(b, 0, totalSize);
+      CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT,
+          b.length - Bytes.SIZEOF_INT);
+      if (header != null) {
+        cos.writeMessageNoTag(header);
+      }
+      if (result != null) {
+        cos.writeMessageNoTag(result);
+      }
+      cos.flush();
+      cos.checkNoSpaceLeft();
+      return b;
+    }
+
     private BufferChain wrapWithSasl(BufferChain bc)
         throws IOException {
       if (!this.connection.useSasl) return bc;