You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2019/09/02 05:03:02 UTC
[hbase] branch branch-2 updated: HBASE-22905 Avoid temp ByteBuffer
allocation in (#538)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new f00f56f HBASE-22905 Avoid temp ByteBuffer allocation in (#538)
f00f56f is described below
commit f00f56fd633db735c8e2b6b7e29f7db50863737e
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Mon Sep 2 13:01:36 2019 +0800
HBASE-22905 Avoid temp ByteBuffer allocation in (#538)
BlockingRpcConnection#writeRequest
---
.../hadoop/hbase/ipc/BlockingRpcConnection.java | 64 ++++++++++++----------
.../java/org/apache/hadoop/hbase/ipc/IPCUtil.java | 11 ++--
2 files changed, 41 insertions(+), 34 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index b152a1d..ee0aeea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -35,7 +35,6 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
@@ -70,6 +69,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -599,37 +600,44 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
* @see #readResponse()
*/
private void writeRequest(Call call) throws IOException {
- ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
- this.compressor, call.cells);
- CellBlockMeta cellBlockMeta;
- if (cellBlock != null) {
- cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
- } else {
- cellBlockMeta = null;
- }
- RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
+ ByteBuf cellBlock = null;
+ try {
+ cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
+ call.cells, PooledByteBufAllocator.DEFAULT);
+ CellBlockMeta cellBlockMeta;
+ if (cellBlock != null) {
+ cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
+ } else {
+ cellBlockMeta = null;
+ }
+ RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
- setupIOstreams();
+ setupIOstreams();
- // Now we're going to write the call. We take the lock, then check that the connection
- // is still valid, and, if so we do the write to the socket. If the write fails, we don't
- // know where we stand, we have to close the connection.
- if (Thread.interrupted()) {
- throw new InterruptedIOException();
- }
+ // Now we're going to write the call. We take the lock, then check that the connection
+ // is still valid, and, if so we do the write to the socket. If the write fails, we don't
+ // know where we stand, we have to close the connection.
+ if (Thread.interrupted()) {
+ throw new InterruptedIOException();
+ }
- calls.put(call.id, call); // We put first as we don't want the connection to become idle.
- // from here, we do not throw any exception to upper layer as the call has been tracked in the
- // pending calls map.
- try {
- call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
- } catch (Throwable t) {
- if(LOG.isTraceEnabled()) {
- LOG.trace("Error while writing call, call_id:" + call.id, t);
+ calls.put(call.id, call); // We put first as we don't want the connection to become idle.
+ // from here, we do not throw any exception to upper layer as the call has been tracked in
+ // the pending calls map.
+ try {
+ call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
+ } catch (Throwable t) {
+ if(LOG.isTraceEnabled()) {
+ LOG.trace("Error while writing call, call_id:" + call.id, t);
+ }
+ IOException e = IPCUtil.toIOE(t);
+ closeConn(e);
+ return;
+ }
+ } finally {
+ if (cellBlock != null) {
+ cellBlock.release();
}
- IOException e = IPCUtil.toIOE(t);
- closeConn(e);
- return;
}
notifyAll();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index c6bbd0b..46562c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -23,7 +23,6 @@ import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -41,7 +40,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@@ -62,19 +61,19 @@ class IPCUtil {
* @throws IOException if write action fails
*/
public static int write(final OutputStream dos, final Message header, final Message param,
- final ByteBuffer cellBlock) throws IOException {
+ final ByteBuf cellBlock) throws IOException {
// Must calculate total size and write that first so other side can read it all in in one
// swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) {
- totalSize += cellBlock.remaining();
+ totalSize += cellBlock.readableBytes();
}
return write(dos, header, param, cellBlock, totalSize);
}
private static int write(final OutputStream dos, final Message header, final Message param,
- final ByteBuffer cellBlock, final int totalSize) throws IOException {
+ final ByteBuf cellBlock, final int totalSize) throws IOException {
// I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally.
@@ -83,7 +82,7 @@ class IPCUtil {
param.writeDelimitedTo(dos);
}
if (cellBlock != null) {
- dos.write(cellBlock.array(), 0, cellBlock.remaining());
+ cellBlock.readBytes(dos, cellBlock.readableBytes());
}
dos.flush();
return totalSize;