You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/12/20 03:27:20 UTC
[hbase] branch master updated: HBASE-25420 Some minor improvements
in rpc implementation (#2792)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 1540b89 HBASE-25420 Some minor improvements in rpc implementation (#2792)
1540b89 is described below
commit 1540b89ceece9c2ebe10959a32519787cd39dc50
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Dec 20 11:26:36 2020 +0800
HBASE-25420 Some minor improvements in rpc implementation (#2792)
Signed-off-by: XinSun <dd...@gmail.com>
Signed-off-by: stack <st...@apache.com>
---
.../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 28 +++++++-------
.../hadoop/hbase/ipc/NettyRpcFrameDecoder.java | 8 ++--
.../org/apache/hadoop/hbase/ipc/ServerCall.java | 31 +++++++++------
.../apache/hadoop/hbase/ipc/SimpleRpcServer.java | 45 +++++++++++-----------
4 files changed, 58 insertions(+), 54 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 649375a..2a2df8a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -17,35 +17,35 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.ipc.RemoteException;
/**
* The netty rpc handler.
@@ -103,8 +103,8 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
ctx.write(buf, withoutCellBlockPromise);
ChannelPromise cellBlockPromise = ctx.newPromise();
ctx.write(cellBlock, cellBlockPromise);
- PromiseCombiner combiner = new PromiseCombiner();
- combiner.addAll(withoutCellBlockPromise, cellBlockPromise);
+ PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
+ combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise);
combiner.finish(promise);
} else {
ctx.write(buf, promise);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
index 5ed3d2e..9444cd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.List;
-
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
@@ -30,6 +29,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -124,10 +124,8 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
RPCProtos.RequestHeader header = getHeader(in, headerSize);
// Notify the client about the offending request
- NettyServerCall reqTooBig =
- new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
- connection, 0, connection.addr, System.currentTimeMillis(), 0,
- connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
+ NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null,
+ null, null, null, 0, connection.addr, 0, null);
connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index d20e28f..a5c8a39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -26,25 +26,27 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
@@ -217,10 +219,14 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
}
@Override
- public synchronized void setResponse(Message m, final CellScanner cells,
- Throwable t, String errorMsg) {
- if (this.isError) return;
- if (t != null) this.isError = true;
+ public synchronized void setResponse(Message m, final CellScanner cells, Throwable t,
+ String errorMsg) {
+ if (this.isError) {
+ return;
+ }
+ if (t != null) {
+ this.isError = true;
+ }
BufferChain bc = null;
try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
@@ -385,9 +391,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
return pbBuf;
}
- protected BufferChain wrapWithSasl(BufferChain bc)
- throws IOException {
- if (!this.connection.useSasl) return bc;
+ protected BufferChain wrapWithSasl(BufferChain bc) throws IOException {
+ if (!this.connection.useSasl) {
+ return bc;
+ }
// Looks like no way around this; saslserver wants a byte array. I have to make it one.
// THIS IS A BIG UGLY COPY.
byte [] responseBytes = bc.getBytes();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index f3f7807..cbcbc9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -40,24 +40,23 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
/**
* The RPC server with native java NIO implementation deriving from Hadoop to
@@ -307,7 +306,7 @@ public class SimpleRpcServer extends RpcServer {
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
- IOUtils.cleanup(null, channel);
+ IOUtils.cleanupWithLogger(LOG, channel);
}
continue;
}
@@ -416,10 +415,12 @@ public class SimpleRpcServer extends RpcServer {
@Override
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
- /** Starts the service. Must be called before any calls will be handled. */
+ /** Starts the service. Must be called before any calls will be handled. */
@Override
public synchronized void start() {
- if (started) return;
+ if (started) {
+ return;
+ }
authTokenSecretMgr = createSecretManager();
if (authTokenSecretMgr != null) {
setSecretManager(authTokenSecretMgr);
@@ -433,7 +434,7 @@ public class SimpleRpcServer extends RpcServer {
started = true;
}
- /** Stops the service. No new calls will be handled after this is called. */
+ /** Stops the service. No new calls will be handled after this is called. */
@Override
public synchronized void stop() {
LOG.info("Stopping server on " + port);
@@ -449,10 +450,9 @@ public class SimpleRpcServer extends RpcServer {
notifyAll();
}
- /** Wait for the server to be stopped.
- * Does not wait for all subthreads to finish.
- * See {@link #stop()}.
- * @throws InterruptedException e
+ /**
+ * Wait for the server to be stopped. Does not wait for all subthreads to finish.
+ * @see #stop()
*/
@Override
public synchronized void join() throws InterruptedException {
@@ -503,13 +503,14 @@ public class SimpleRpcServer extends RpcServer {
* @param channel writable byte channel to write to
* @param bufferChain Chain of buffers to write
* @return number of bytes written
- * @throws java.io.IOException e
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
*/
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
- throws IOException {
- long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
- if (count > 0) this.metrics.sentBytes(count);
+ throws IOException {
+ long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
+ if (count > 0) {
+ this.metrics.sentBytes(count);
+ }
return count;
}
@@ -523,22 +524,20 @@ public class SimpleRpcServer extends RpcServer {
* @throws UnknownHostException if the address isn't a valid host name
* @throws IOException other random errors from bind
*/
- public static void bind(ServerSocket socket, InetSocketAddress address,
- int backlog) throws IOException {
+ public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
+ throws IOException {
try {
socket.bind(address, backlog);
} catch (BindException e) {
BindException bindException =
- new BindException("Problem binding to " + address + " : " +
- e.getMessage());
+ new BindException("Problem binding to " + address + " : " + e.getMessage());
bindException.initCause(e);
throw bindException;
} catch (SocketException e) {
// If they try to bind to a different host's address, give a better
// error message.
if ("Unresolved address".equals(e.getMessage())) {
- throw new UnknownHostException("Invalid hostname for server: " +
- address.getHostName());
+ throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
}
throw e;
}