You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2019/09/02 05:03:02 UTC

[hbase] branch branch-2 updated: HBASE-22905 Avoid temp ByteBuffer allocation in (#538)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new f00f56f  HBASE-22905 Avoid temp ByteBuffer allocation in (#538)
f00f56f is described below

commit f00f56fd633db735c8e2b6b7e29f7db50863737e
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Mon Sep 2 13:01:36 2019 +0800

    HBASE-22905 Avoid temp ByteBuffer allocation in (#538)
    
    BlockingRpcConnection#writeRequest
---
 .../hadoop/hbase/ipc/BlockingRpcConnection.java    | 64 ++++++++++++----------
 .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java  | 11 ++--
 2 files changed, 41 insertions(+), 34 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index b152a1d..ee0aeea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -35,7 +35,6 @@ import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayDeque;
 import java.util.Locale;
@@ -70,6 +69,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -599,37 +600,44 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
    * @see #readResponse()
    */
   private void writeRequest(Call call) throws IOException {
-    ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
-      this.compressor, call.cells);
-    CellBlockMeta cellBlockMeta;
-    if (cellBlock != null) {
-      cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
-    } else {
-      cellBlockMeta = null;
-    }
-    RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
+    ByteBuf cellBlock = null;
+    try {
+      cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
+          call.cells, PooledByteBufAllocator.DEFAULT);
+      CellBlockMeta cellBlockMeta;
+      if (cellBlock != null) {
+        cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
+      } else {
+        cellBlockMeta = null;
+      }
+      RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
 
-    setupIOstreams();
+      setupIOstreams();
 
-    // Now we're going to write the call. We take the lock, then check that the connection
-    // is still valid, and, if so we do the write to the socket. If the write fails, we don't
-    // know where we stand, we have to close the connection.
-    if (Thread.interrupted()) {
-      throw new InterruptedIOException();
-    }
+      // Now we're going to write the call. We take the lock, then check that the connection
+      // is still valid, and, if so we do the write to the socket. If the write fails, we don't
+      // know where we stand, we have to close the connection.
+      if (Thread.interrupted()) {
+        throw new InterruptedIOException();
+      }
 
-    calls.put(call.id, call); // We put first as we don't want the connection to become idle.
-    // from here, we do not throw any exception to upper layer as the call has been tracked in the
-    // pending calls map.
-    try {
-      call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
-    } catch (Throwable t) {
-      if(LOG.isTraceEnabled()) {
-        LOG.trace("Error while writing call, call_id:" + call.id, t);
+      calls.put(call.id, call); // We put first as we don't want the connection to become idle.
+      // from here, we do not throw any exception to upper layer as the call has been tracked in
+      // the pending calls map.
+      try {
+        call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
+      } catch (Throwable t) {
+        if(LOG.isTraceEnabled()) {
+          LOG.trace("Error while writing call, call_id:" + call.id, t);
+        }
+        IOException e = IPCUtil.toIOE(t);
+        closeConn(e);
+        return;
+      }
+    } finally {
+      if (cellBlock != null) {
+        cellBlock.release();
       }
-      IOException e = IPCUtil.toIOE(t);
-      closeConn(e);
-      return;
     }
     notifyAll();
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index c6bbd0b..46562c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -23,7 +23,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -41,7 +40,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@@ -62,19 +61,19 @@ class IPCUtil {
    * @throws IOException if write action fails
    */
   public static int write(final OutputStream dos, final Message header, final Message param,
-      final ByteBuffer cellBlock) throws IOException {
+      final ByteBuf cellBlock) throws IOException {
     // Must calculate total size and write that first so other side can read it all in in one
     // swoop. This is dictated by how the server is currently written. Server needs to change
     // if we are to be able to write without the length prefixing.
     int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
     if (cellBlock != null) {
-      totalSize += cellBlock.remaining();
+      totalSize += cellBlock.readableBytes();
     }
     return write(dos, header, param, cellBlock, totalSize);
   }
 
   private static int write(final OutputStream dos, final Message header, final Message param,
-      final ByteBuffer cellBlock, final int totalSize) throws IOException {
+      final ByteBuf cellBlock, final int totalSize) throws IOException {
     // I confirmed toBytes does same as DataOutputStream#writeInt.
     dos.write(Bytes.toBytes(totalSize));
     // This allocates a buffer that is the size of the message internally.
@@ -83,7 +82,7 @@ class IPCUtil {
       param.writeDelimitedTo(dos);
     }
     if (cellBlock != null) {
-      dos.write(cellBlock.array(), 0, cellBlock.remaining());
+      cellBlock.readBytes(dos, cellBlock.readableBytes());
     }
     dos.flush();
     return totalSize;