You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/12/07 13:53:50 UTC
[hbase] branch master updated: HBASE-25336 Use Address instead of
InetSocketAddress in RpcClient implementation (#2716)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new f813479 HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716)
f813479 is described below
commit f8134795109bc380b53ec814561e1abdb56b2b58
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Dec 7 21:49:04 2020 +0800
HBASE-25336 Use Address instead of InetSocketAddress in RpcClient implementation (#2716)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 60 ++++------------------
.../hadoop/hbase/ipc/BlockingRpcConnection.java | 25 ++-------
.../hadoop/hbase/ipc/NettyRpcConnection.java | 27 ++--------
.../org/apache/hadoop/hbase/ipc/RpcConnection.java | 24 +++++++--
4 files changed, 39 insertions(+), 97 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 7a7b848..e9ec6a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -320,7 +318,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
* @return A pair with the Message response and the Cell data (if any).
*/
private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
- Message param, Message returnType, final User ticket, final InetSocketAddress isa)
+ Message param, Message returnType, final User ticket, final Address isa)
throws ServiceException {
BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
callMethod(md, hrc, param, returnType, ticket, isa, done);
@@ -392,7 +390,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket,
- final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
+ final Address addr, final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
@@ -406,7 +404,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
cs.setNumActionsPerServer(numActions);
}
- final Address addr = Address.fromSocketAddress(inetAddr);
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@@ -520,13 +517,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected final Address addr;
- // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
- // per method call on the channel. If the remote target is removed or reprovisioned and
- // its identity changes a new channel with a newly resolved InetSocketAddress will be
- // created as part of retry, so caching here is fine.
- // Normally, caching an InetSocketAddress is an anti-pattern.
- protected InetSocketAddress isa;
-
protected final AbstractRpcClient<?> rpcClient;
protected final User ticket;
@@ -576,23 +566,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
- Message param, Message returnType) throws ServiceException {
- // Look up remote address upon first call
- if (isa == null) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- isa = Address.toSocketAddress(addr);
- if (isa.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- isa = null;
- throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
- }
- }
- return rpcClient.callBlockingMethod(md, configureRpcController(controller),
- param, returnType, ticket, isa);
+ Message param, Message returnType) throws ServiceException {
+ return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
+ ticket, addr);
}
}
@@ -608,29 +584,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
}
@Override
- public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
- Message param, Message returnType, RpcCallback<Message> done) {
- HBaseRpcController configuredController =
- configureRpcController(Preconditions.checkNotNull(controller,
- "RpcController can not be null for async rpc call"));
- // Look up remote address upon first call
- if (isa == null || isa.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- isa = Address.toSocketAddress(addr);
- if (isa.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- isa = null;
- controller.setFailed(addr + " could not be resolved");
- return;
- }
- }
+ public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
+ Message returnType, RpcCallback<Message> done) {
+ HBaseRpcController configuredController = configureRpcController(
+ Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
// This method does not throw any exceptions, so the caller must provide a
// HBaseRpcController which is used to pass the exceptions.
- this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
+ this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
}
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index ce2bd11..cd8035f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -35,7 +35,6 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
@@ -44,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.security.sasl.SaslException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -69,11 +66,13 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -256,16 +255,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (this.rpcClient.localAddr != null) {
this.socket.bind(this.rpcClient.localAddr);
}
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
- if (remoteAddr.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
+ InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
this.socket.setSoTimeout(this.rpcClient.readTO);
return;
@@ -374,15 +364,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (this.metrics != null) {
this.metrics.incrNsLookups();
}
- InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
- if (serverAddr.isUnresolved()) {
- if (this.metrics != null) {
- this.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
- serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed,
+ socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
this.rpcClient.conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 609d2c1..d0a13ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -32,17 +32,16 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
-import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -210,18 +209,9 @@ class NettyRpcConnection extends RpcConnection {
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
final NettyHBaseSaslRpcClientHandler saslHandler;
try {
- if (this.metrics != null) {
- this.metrics.incrNsLookups();
- }
- InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
- if (serverAddr.isUnresolved()) {
- if (this.metrics != null) {
- this.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
- serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
+ ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
+ rpcClient.fallbackAllowed, this.rpcClient.conf);
} catch (IOException e) {
failInit(ch, e);
return;
@@ -282,16 +272,7 @@ class NettyRpcConnection extends RpcConnection {
private void connect() throws UnknownHostException {
assert eventLoop.inEventLoop();
LOG.trace("Connecting to {}", remoteId.getAddress());
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookups();
- }
- InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
- if (remoteAddr.isUnresolved()) {
- if (this.rpcClient.metrics != null) {
- this.rpcClient.metrics.incrNsLookupsFailed();
- }
- throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
- }
+ InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 6749efe..b2c7eea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
@@ -122,7 +125,7 @@ abstract class RpcConnection {
this.remoteId = remoteId;
}
- protected void scheduleTimeoutTask(final Call call) {
+ protected final void scheduleTimeoutTask(final Call call) {
if (call.timeout > 0) {
call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {
@@ -137,7 +140,7 @@ abstract class RpcConnection {
}
}
- protected byte[] getConnectionHeaderPreamble() {
+ protected final byte[] getConnectionHeaderPreamble() {
// Assemble the preamble up in a buffer first and then send it. Writing individual elements,
// they are getting sent across piecemeal according to wireshark and then server is messing
// up the reading on occasion (the passed in stream is not buffered yet).
@@ -153,7 +156,7 @@ abstract class RpcConnection {
return preamble;
}
- protected ConnectionHeader getConnectionHeader() {
+ protected final ConnectionHeader getConnectionHeader() {
final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setServiceName(remoteId.getServiceName());
final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket);
@@ -176,6 +179,21 @@ abstract class RpcConnection {
return builder.build();
}
+ protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
+ throws UnknownHostException {
+ if (metrics != null) {
+ metrics.incrNsLookups();
+ }
+ InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
+ if (remoteAddr.isUnresolved()) {
+ if (metrics != null) {
+ metrics.incrNsLookupsFailed();
+ }
+ throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
+ }
+ return remoteAddr;
+ }
+
protected abstract void callTimeout(Call call);
public ConnectionId remoteId() {