You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/12/07 13:53:50 UTC

[hbase] branch master updated: HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new f813479  HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716)
f813479 is described below

commit f8134795109bc380b53ec814561e1abdb56b2b58
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Dec 7 21:49:04 2020 +0800

    HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 60 ++++------------------
 .../hadoop/hbase/ipc/BlockingRpcConnection.java    | 25 ++-------
 .../hadoop/hbase/ipc/NettyRpcConnection.java       | 27 ++--------
 .../org/apache/hadoop/hbase/ipc/RpcConnection.java | 24 +++++++--
 4 files changed, 39 insertions(+), 97 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 7a7b848..e9ec6a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -320,7 +318,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
    * @return A pair with the Message response and the Cell data (if any).
    */
   private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
-      Message param, Message returnType, final User ticket, final InetSocketAddress isa)
+      Message param, Message returnType, final User ticket, final Address isa)
       throws ServiceException {
     BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
     callMethod(md, hrc, param, returnType, ticket, isa, done);
@@ -392,7 +390,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
 
   Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
       final Message param, Message returnType, final User ticket,
-      final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
+      final Address addr, final RpcCallback<Message> callback) {
     final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
     cs.setStartTime(EnvironmentEdgeManager.currentTime());
 
@@ -406,7 +404,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
       cs.setNumActionsPerServer(numActions);
     }
 
-    final Address addr = Address.fromSocketAddress(inetAddr);
     final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
     Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
         hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@@ -520,13 +517,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
 
     protected final Address addr;
 
-    // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
-    // per method call on the channel. If the remote target is removed or reprovisioned and
-    // its identity changes a new channel with a newly resolved InetSocketAddress will be
-    // created as part of retry, so caching here is fine.
-    // Normally, caching an InetSocketAddress is an anti-pattern.
-    protected InetSocketAddress isa;
-
     protected final AbstractRpcClient<?> rpcClient;
 
     protected final User ticket;
@@ -576,23 +566,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
 
     @Override
     public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
-        Message param, Message returnType) throws ServiceException {
-      // Look up remote address upon first call
-      if (isa == null) {
-        if (this.rpcClient.metrics != null) {
-          this.rpcClient.metrics.incrNsLookups();
-        }
-        isa = Address.toSocketAddress(addr);
-        if (isa.isUnresolved()) {
-          if (this.rpcClient.metrics != null) {
-            this.rpcClient.metrics.incrNsLookupsFailed();
-          }
-          isa = null;
-          throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
-        }
-      }
-      return rpcClient.callBlockingMethod(md, configureRpcController(controller),
-        param, returnType, ticket, isa);
+      Message param, Message returnType) throws ServiceException {
+      return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
+        ticket, addr);
     }
   }
 
@@ -608,29 +584,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
     }
 
     @Override
-    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
-        Message param, Message returnType, RpcCallback<Message> done) {
-      HBaseRpcController configuredController =
-        configureRpcController(Preconditions.checkNotNull(controller,
-          "RpcController can not be null for async rpc call"));
-      // Look up remote address upon first call
-      if (isa == null || isa.isUnresolved()) {
-        if (this.rpcClient.metrics != null) {
-          this.rpcClient.metrics.incrNsLookups();
-        }
-        isa = Address.toSocketAddress(addr);
-        if (isa.isUnresolved()) {
-          if (this.rpcClient.metrics != null) {
-            this.rpcClient.metrics.incrNsLookupsFailed();
-          }
-          isa = null;
-          controller.setFailed(addr + " could not be resolved");
-          return;
-        }
-      }
+    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
+      Message returnType, RpcCallback<Message> done) {
+      HBaseRpcController configuredController = configureRpcController(
+        Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
       // This method does not throw any exceptions, so the caller must provide a
       // HBaseRpcController which is used to pass the exceptions.
-      this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
+      this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
     }
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index ce2bd11..cd8035f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -35,7 +35,6 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayDeque;
 import java.util.Locale;
@@ -44,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.security.sasl.SaslException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -69,11 +66,13 @@ import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -256,16 +255,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
         if (this.rpcClient.localAddr != null) {
           this.socket.bind(this.rpcClient.localAddr);
         }
-        if (this.rpcClient.metrics != null) {
-          this.rpcClient.metrics.incrNsLookups();
-        }
-        InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
-        if (remoteAddr.isUnresolved()) {
-          if (this.rpcClient.metrics != null) {
-            this.rpcClient.metrics.incrNsLookupsFailed();
-          }
-          throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
-        }
+        InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
         NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
         this.socket.setSoTimeout(this.rpcClient.readTO);
         return;
@@ -374,15 +364,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
     if (this.metrics != null) {
       this.metrics.incrNsLookups();
     }
-    InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
-    if (serverAddr.isUnresolved()) {
-      if (this.metrics != null) {
-        this.metrics.incrNsLookupsFailed();
-      }
-      throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
-    }
     saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
-        serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed,
+        socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
         this.rpcClient.conf.get("hbase.rpc.protection",
             QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
         this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 609d2c1..d0a13ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -32,17 +32,16 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
-import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
 import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
 import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -210,18 +209,9 @@ class NettyRpcConnection extends RpcConnection {
     Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
     final NettyHBaseSaslRpcClientHandler saslHandler;
     try {
-      if (this.metrics != null) {
-        this.metrics.incrNsLookups();
-      }
-      InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
-      if (serverAddr.isUnresolved()) {
-        if (this.metrics != null) {
-          this.metrics.incrNsLookupsFailed();
-        }
-        throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
-      }
       saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
-        serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
+        ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
+        rpcClient.fallbackAllowed, this.rpcClient.conf);
     } catch (IOException e) {
       failInit(ch, e);
       return;
@@ -282,16 +272,7 @@ class NettyRpcConnection extends RpcConnection {
   private void connect() throws UnknownHostException {
     assert eventLoop.inEventLoop();
     LOG.trace("Connecting to {}", remoteId.getAddress());
-    if (this.rpcClient.metrics != null) {
-      this.rpcClient.metrics.incrNsLookups();
-    }
-    InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
-    if (remoteAddr.isUnresolved()) {
-      if (this.rpcClient.metrics != null) {
-        this.rpcClient.metrics.incrNsLookupsFailed();
-      }
-      throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
-    }
+    InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
     this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
       .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
       .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 6749efe..b2c7eea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -18,12 +18,15 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
@@ -122,7 +125,7 @@ abstract class RpcConnection {
     this.remoteId = remoteId;
   }
 
-  protected void scheduleTimeoutTask(final Call call) {
+  protected final void scheduleTimeoutTask(final Call call) {
     if (call.timeout > 0) {
       call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
 
@@ -137,7 +140,7 @@ abstract class RpcConnection {
     }
   }
 
-  protected byte[] getConnectionHeaderPreamble() {
+  protected final byte[] getConnectionHeaderPreamble() {
     // Assemble the preamble up in a buffer first and then send it. Writing individual elements,
     // they are getting sent across piecemeal according to wireshark and then server is messing
     // up the reading on occasion (the passed in stream is not buffered yet).
@@ -153,7 +156,7 @@ abstract class RpcConnection {
     return preamble;
   }
 
-  protected ConnectionHeader getConnectionHeader() {
+  protected final ConnectionHeader getConnectionHeader() {
     final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
     builder.setServiceName(remoteId.getServiceName());
     final UserInformation userInfoPB  = provider.getUserInfo(remoteId.ticket);
@@ -176,6 +179,21 @@ abstract class RpcConnection {
     return builder.build();
   }
 
+  protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
+    throws UnknownHostException {
+    if (metrics != null) {
+      metrics.incrNsLookups();
+    }
+    InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
+    if (remoteAddr.isUnresolved()) {
+      if (metrics != null) {
+        metrics.incrNsLookupsFailed();
+      }
+      throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
+    }
+    return remoteAddr;
+  }
+
   protected abstract void callTimeout(Call call);
 
   public ConnectionId remoteId() {