You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/12/20 03:36:11 UTC

[hbase] branch branch-2.3 updated: HBASE-25420 Some minor improvements in rpc implementation (#2792)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 7eb786a  HBASE-25420 Some minor improvements in rpc implementation (#2792)
7eb786a is described below

commit 7eb786a0ba17c71737da4b15a80a9327e69e938f
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Dec 20 11:26:36 2020 +0800

    HBASE-25420 Some minor improvements in rpc implementation (#2792)
    
    Signed-off-by: XinSun <dd...@gmail.com>
    Signed-off-by: stack <st...@apache.com>
---
 .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java    | 28 +++++++-------
 .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java     |  8 ++--
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    | 31 +++++++++------
 .../apache/hadoop/hbase/ipc/SimpleRpcServer.java   | 45 +++++++++++-----------
 4 files changed, 58 insertions(+), 54 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 649375a..2a2df8a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -17,35 +17,35 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * The netty rpc handler.
@@ -103,8 +103,8 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
         ctx.write(buf, withoutCellBlockPromise);
         ChannelPromise cellBlockPromise = ctx.newPromise();
         ctx.write(cellBlock, cellBlockPromise);
-        PromiseCombiner combiner = new PromiseCombiner();
-        combiner.addAll(withoutCellBlockPromise, cellBlockPromise);
+        PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
+        combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise);
         combiner.finish(promise);
       } else {
         ctx.write(buf, promise);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
index 5ed3d2e..9444cd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 import java.util.List;
-
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
@@ -30,6 +29,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 
@@ -124,10 +124,8 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
     RPCProtos.RequestHeader header = getHeader(in, headerSize);
 
     // Notify the client about the offending request
-    NettyServerCall reqTooBig =
-      new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
-        connection, 0, connection.addr, System.currentTimeMillis(), 0,
-        connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null);
+    NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null,
+      null, null, null, 0, connection.addr, 0, null);
 
     connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index d20e28f..a5c8a39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -26,25 +26,27 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * Datastructure that holds all necessary to a method invocation and then afterward, carries
@@ -217,10 +219,14 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
   }
 
   @Override
-  public synchronized void setResponse(Message m, final CellScanner cells,
-      Throwable t, String errorMsg) {
-    if (this.isError) return;
-    if (t != null) this.isError = true;
+  public synchronized void setResponse(Message m, final CellScanner cells, Throwable t,
+    String errorMsg) {
+    if (this.isError) {
+      return;
+    }
+    if (t != null) {
+      this.isError = true;
+    }
     BufferChain bc = null;
     try {
       ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
@@ -385,9 +391,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
     return pbBuf;
   }
 
-  protected BufferChain wrapWithSasl(BufferChain bc)
-      throws IOException {
-    if (!this.connection.useSasl) return bc;
+  protected BufferChain wrapWithSasl(BufferChain bc) throws IOException {
+    if (!this.connection.useSasl) {
+      return bc;
+    }
     // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
     // THIS IS A BIG UGLY COPY.
     byte [] responseBytes = bc.getBytes();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index f3f7807..cbcbc9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -40,24 +40,23 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 
 /**
  * The RPC server with native java NIO implementation deriving from Hadoop to
@@ -307,7 +306,7 @@ public class SimpleRpcServer extends RpcServer {
         // If the connectionManager can't take it, close the connection.
         if (c == null) {
           if (channel.isOpen()) {
-            IOUtils.cleanup(null, channel);
+            IOUtils.cleanupWithLogger(LOG, channel);
           }
           continue;
         }
@@ -416,10 +415,12 @@ public class SimpleRpcServer extends RpcServer {
   @Override
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 
-  /** Starts the service.  Must be called before any calls will be handled. */
+  /** Starts the service. Must be called before any calls will be handled. */
   @Override
   public synchronized void start() {
-    if (started) return;
+    if (started) {
+      return;
+    }
     authTokenSecretMgr = createSecretManager();
     if (authTokenSecretMgr != null) {
       setSecretManager(authTokenSecretMgr);
@@ -433,7 +434,7 @@ public class SimpleRpcServer extends RpcServer {
     started = true;
   }
 
-  /** Stops the service.  No new calls will be handled after this is called. */
+  /** Stops the service. No new calls will be handled after this is called. */
   @Override
   public synchronized void stop() {
     LOG.info("Stopping server on " + port);
@@ -449,10 +450,9 @@ public class SimpleRpcServer extends RpcServer {
     notifyAll();
   }
 
-  /** Wait for the server to be stopped.
-   * Does not wait for all subthreads to finish.
-   *  See {@link #stop()}.
-   * @throws InterruptedException e
+  /**
+   * Wait for the server to be stopped. Does not wait for all subthreads to finish.
+   * @see #stop()
    */
   @Override
   public synchronized void join() throws InterruptedException {
@@ -503,13 +503,14 @@ public class SimpleRpcServer extends RpcServer {
    * @param channel writable byte channel to write to
    * @param bufferChain Chain of buffers to write
    * @return number of bytes written
-   * @throws java.io.IOException e
    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
    */
   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
-  throws IOException {
-    long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
-    if (count > 0) this.metrics.sentBytes(count);
+    throws IOException {
+    long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
+    if (count > 0) {
+      this.metrics.sentBytes(count);
+    }
     return count;
   }
 
@@ -523,22 +524,20 @@ public class SimpleRpcServer extends RpcServer {
    * @throws UnknownHostException if the address isn't a valid host name
    * @throws IOException other random errors from bind
    */
-  public static void bind(ServerSocket socket, InetSocketAddress address,
-                          int backlog) throws IOException {
+  public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
+    throws IOException {
     try {
       socket.bind(address, backlog);
     } catch (BindException e) {
       BindException bindException =
-        new BindException("Problem binding to " + address + " : " +
-            e.getMessage());
+        new BindException("Problem binding to " + address + " : " + e.getMessage());
       bindException.initCause(e);
       throw bindException;
     } catch (SocketException e) {
       // If they try to bind to a different host's address, give a better
       // error message.
       if ("Unresolved address".equals(e.getMessage())) {
-        throw new UnknownHostException("Invalid hostname for server: " +
-                                       address.getHostName());
+        throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
       }
       throw e;
     }