You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/09/08 12:33:22 UTC
[6/6] hbase git commit: HBASE-16445 Refactor and reimplement RpcClient
HBASE-16445 Refactor and reimplement RpcClient
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c04b3891
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c04b3891
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c04b3891
Branch: refs/heads/master
Commit: c04b389181b6a9299f05f1ad8f8b1ec62448331a
Parents: fc224ed
Author: zhangduo <zh...@apache.org>
Authored: Thu Sep 8 17:46:33 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Sep 8 20:32:56 2016 +0800
----------------------------------------------------------------------
.../RpcRetryingCallerWithReadReplicas.java | 21 +-
.../hadoop/hbase/ipc/AbstractRpcClient.java | 467 ++++--
.../org/apache/hadoop/hbase/ipc/AsyncCall.java | 204 ---
.../hadoop/hbase/ipc/AsyncRpcChannel.java | 768 ----------
.../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 510 -------
.../hbase/ipc/AsyncServerResponseHandler.java | 126 --
.../hadoop/hbase/ipc/BlockingRpcCallback.java | 2 +-
.../hadoop/hbase/ipc/BlockingRpcClient.java | 77 +
.../hadoop/hbase/ipc/BlockingRpcConnection.java | 725 ++++++++++
.../hbase/ipc/BufferCallBeforeInitHandler.java | 103 ++
.../java/org/apache/hadoop/hbase/ipc/Call.java | 122 +-
.../hbase/ipc/CallCancelledException.java | 37 +
.../org/apache/hadoop/hbase/ipc/CallEvent.java | 40 +
.../hadoop/hbase/ipc/CallTimeoutException.java | 6 +-
.../hadoop/hbase/ipc/CellBlockBuilder.java | 111 +-
.../apache/hadoop/hbase/ipc/ConnectionId.java | 2 +-
.../hbase/ipc/DefaultNettyEventLoopConfig.java | 37 +
.../hbase/ipc/FallbackDisallowedException.java | 38 +
.../hadoop/hbase/ipc/IOExceptionConverter.java | 34 -
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 55 +-
.../hadoop/hbase/ipc/MessageConverter.java | 47 -
.../apache/hadoop/hbase/ipc/NettyRpcClient.java | 80 ++
.../hbase/ipc/NettyRpcClientConfigHelper.java | 83 ++
.../hadoop/hbase/ipc/NettyRpcConnection.java | 282 ++++
.../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 245 ++++
.../org/apache/hadoop/hbase/ipc/RpcClient.java | 6 +-
.../hadoop/hbase/ipc/RpcClientFactory.java | 26 +-
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 1359 ------------------
.../apache/hadoop/hbase/ipc/RpcConnection.java | 255 ++++
.../security/AbstractHBaseSaslRpcClient.java | 197 +++
.../hbase/security/AsyncHBaseSaslRpcClient.java | 58 +
.../AsyncHBaseSaslRpcClientHandler.java | 135 ++
.../hbase/security/HBaseSaslRpcClient.java | 234 +--
.../hbase/security/SaslChallengeDecoder.java | 112 ++
.../hbase/security/SaslClientHandler.java | 382 -----
.../hbase/security/SaslUnwrapHandler.java | 53 +
.../apache/hadoop/hbase/security/SaslUtil.java | 14 +-
.../hadoop/hbase/security/SaslWrapHandler.java | 80 ++
.../hadoop/hbase/ipc/TestCellBlockBuilder.java | 19 +-
.../ipc/TestRpcClientDeprecatedNameMapping.java | 56 +
.../hbase/security/TestHBaseSaslRpcClient.java | 8 +-
.../hbase/ipc/IntegrationTestRpcClient.java | 25 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 2 +-
.../hadoop/hbase/master/ServerManager.java | 1 +
.../hadoop/hbase/util/MultiHConnection.java | 1 -
.../hadoop/hbase/client/TestClientTimeouts.java | 11 +-
.../hbase/client/TestRpcControllerFactory.java | 4 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 121 +-
.../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 113 --
.../hadoop/hbase/ipc/TestBlockingIPC.java | 58 +
.../hbase/ipc/TestGlobalEventLoopGroup.java | 53 -
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 74 -
.../apache/hadoop/hbase/ipc/TestNettyIPC.java | 128 ++
.../hbase/ipc/TestProtobufRpcServiceImpl.java | 2 +-
.../hadoop/hbase/ipc/TestRpcClientLeaks.java | 17 +-
.../hbase/ipc/TestRpcHandlerException.java | 3 +-
.../regionserver/TestRegionReplicaFailover.java | 2 +
.../hbase/security/AbstractTestSecureIPC.java | 261 ----
.../hbase/security/TestAsyncSecureIPC.java | 33 -
.../hadoop/hbase/security/TestSecureIPC.java | 250 +++-
.../TestDelegationTokenWithEncryption.java | 12 +-
.../token/TestGenerateDelegationToken.java | 8 +-
62 files changed, 3923 insertions(+), 4472 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 0ea696e..3d55136 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -20,14 +20,17 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -97,7 +100,6 @@ public class RpcRetryingCallerWithReadReplicas {
this.id = id;
this.location = location;
this.controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
}
@Override
@@ -134,6 +136,11 @@ public class RpcRetryingCallerWithReadReplicas {
setStub(cConnection.getClient(dest));
}
+ private void initRpcController() {
+ controller.reset();
+ controller.setCallTimeout(callTimeout);
+ controller.setPriority(tableName);
+ }
@Override
protected Result rpcCall() throws Exception {
if (controller.isCanceled()) return null;
@@ -141,16 +148,13 @@ public class RpcRetryingCallerWithReadReplicas {
throw new InterruptedIOException();
}
byte[] reg = location.getRegionInfo().getRegionName();
- ClientProtos.GetRequest request =
- RequestConverter.buildGetRequest(reg, get);
- // Presumption that we are passed a PayloadCarryingRpcController here!
- HBaseRpcController pcrc = (HBaseRpcController)controller;
- pcrc.setCallTimeout(callTimeout);
+ ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
+ initRpcController();
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) {
return null;
}
- return ProtobufUtil.toResult(response.getResult(), pcrc.cellScanner());
+ return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
}
@Override
@@ -183,7 +187,7 @@ public class RpcRetryingCallerWithReadReplicas {
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
- ResultBoundedCompletionService<Result> cs =
+ final ResultBoundedCompletionService<Result> cs =
new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
if(isTargetReplicaSpecified) {
@@ -207,7 +211,6 @@ public class RpcRetryingCallerWithReadReplicas {
// submit call for the all of the secondaries at once
addCallsForReplica(cs, rl, 1, rl.size() - 1);
}
-
try {
try {
long start = EnvironmentEdgeManager.currentTime();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 6cb0786..098ad3c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -18,54 +18,106 @@
package org.apache.hadoop.hbase.ipc;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import io.netty.util.HashedWheelTimer;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
/**
* Provides the basics for a RpcClient implementation like configuration and Logging.
+ * <p>
+ * Locking schema of the current IPC implementation
+ * <ul>
+ * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
+ * connection.</li>
+ * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
+ * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
+ * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
+ * of how to deal with cancel.</li>
+ * <li>For connection implementation, the construction of a connection should be as fast as possible
+ * because the creation is protected under a lock. Connect to remote side when needed. There is no
+ * forced locking schema for a connection implementation.</li>
+ * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
+ * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
+ * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
+ * of the callbacks are free to hold any lock.</li>
+ * </ul>
*/
@InterfaceAudience.Private
-public abstract class AbstractRpcClient implements RpcClient {
+public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
// Log level is being changed in tests
public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
+ protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
+ Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
+
+ private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
+ .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
+
+ protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
+
+ static {
+ TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
+ }
+
+ protected boolean running = true; // if client runs
+
protected final Configuration conf;
- protected String clusterId;
+ protected final String clusterId;
protected final SocketAddress localAddr;
protected final MetricsConnection metrics;
- protected UserProvider userProvider;
+ protected final UserProvider userProvider;
protected final CellBlockBuilder cellBlockBuilder;
protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
// time (in ms), it will be closed at any moment.
- protected final int maxRetries; //the max. no. of retries for socket connections
+ protected final int maxRetries; // the max. no. of retries for socket connections
protected final long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -73,13 +125,20 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final CompressionCodec compressor;
protected final boolean fallbackAllowed;
+ protected final FailedServers failedServers;
+
protected final int connectTO;
protected final int readTO;
protected final int writeTO;
+ protected final PoolMap<ConnectionId, T> connections;
+
+ private final AtomicInteger callIdCnt = new AtomicInteger(0);
+
+ private final ScheduledFuture<?> cleanupIdleConnectionTask;
+
/**
* Construct an IPC client for the cluster <code>clusterId</code>
- *
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
@@ -92,7 +151,7 @@ public abstract class AbstractRpcClient implements RpcClient {
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
this.cellBlockBuilder = new CellBlockBuilder(conf);
@@ -102,31 +161,53 @@ public abstract class AbstractRpcClient implements RpcClient {
this.codec = getCodec();
this.compressor = getCompressor(conf);
this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
- IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ this.failedServers = new FailedServers(conf);
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
this.metrics = metrics;
- // login the server principal (if using secure Hadoop)
+ this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
+
+ this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ cleanupIdleConnections();
+ }
+ }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
- ", tcpKeepAlive=" + this.tcpKeepAlive +
- ", tcpNoDelay=" + this.tcpNoDelay +
- ", connectTO=" + this.connectTO +
- ", readTO=" + this.readTO +
- ", writeTO=" + this.writeTO +
- ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
- ", maxRetries=" + this.maxRetries +
- ", fallbackAllowed=" + this.fallbackAllowed +
- ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+ LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
+ + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
+ + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
+ + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
+ + this.fallbackAllowed + ", bind address="
+ + (this.localAddr != null ? this.localAddr : "null"));
+ }
+ }
+
+ private void cleanupIdleConnections() {
+ long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
+ synchronized (connections) {
+ for (T conn : connections.values()) {
+ // remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
+ // connection itself has already shutdown. The latter check is because that we may still
+ // have some pending calls on connection so we should not shutdown the connection outside.
+ // The connection itself will disconnect if there is no pending call for maxIdleTime.
+ if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
+ LOG.info("Cleanup idle connection to " + conn.remoteId().address);
+ connections.removeValue(conn.remoteId(), conn);
+ }
+ }
}
}
@VisibleForTesting
public static String getDefaultCodec(final Configuration c) {
// If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
- // Configuration will complain -- then no default codec (and we'll pb everything). Else
+ // Configuration will complain -- then no default codec (and we'll pb everything). Else
// default is KeyValueCodec
return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
}
@@ -143,7 +224,7 @@ public abstract class AbstractRpcClient implements RpcClient {
return null;
}
try {
- return (Codec)Class.forName(className).newInstance();
+ return (Codec) Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting codec " + className, e);
}
@@ -154,6 +235,12 @@ public abstract class AbstractRpcClient implements RpcClient {
return this.codec != null;
}
+ // for writing tests that want to throw exception when connecting.
+ @VisibleForTesting
+ boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @param conf configuration
@@ -165,163 +252,297 @@ public abstract class AbstractRpcClient implements RpcClient {
return null;
}
try {
- return (CompressionCodec)Class.forName(className).newInstance();
+ return (CompressionCodec) Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting compressor " + className, e);
}
}
/**
- * Return the pool type specified in the configuration, which must be set to
- * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
- * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
- * otherwise default to the former.
- *
- * For applications with many user threads, use a small round-robin pool. For
- * applications with few user threads, you may want to try using a
- * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
- * instances should not exceed the operating system's hard limit on the number of
- * connections.
- *
+ * Return the pool type specified in the configuration, which must be set to either
+ * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+ * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
+ * former. For applications with many user threads, use a small round-robin pool. For applications
+ * with few user threads, you may want to try using a thread-local pool. In any case, the number
+ * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
+ * system's hard limit on the number of connections.
* @param config configuration
* @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
*/
- protected static PoolMap.PoolType getPoolType(Configuration config) {
- return PoolMap.PoolType
- .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
- PoolMap.PoolType.ThreadLocal);
+ private static PoolMap.PoolType getPoolType(Configuration config) {
+ return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
+ PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal);
}
/**
- * Return the pool size specified in the configuration, which is applicable only if
- * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
- *
+ * Return the pool size specified in the configuration, which is applicable only if the pool type
+ * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
* @param config configuration
* @return the maximum pool size
*/
- protected static int getPoolSize(Configuration config) {
+ private static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}
+ private int nextCallId() {
+ int id, next;
+ do {
+ id = callIdCnt.get();
+ next = id < Integer.MAX_VALUE ? id + 1 : 0;
+ } while (!callIdCnt.compareAndSet(id, next));
+ return id;
+ }
+
/**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception.
- *
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
- * {@link UserProvider#getCurrent()} makes a new instance of User each time so
- * will be a
- * new Connection each time.
+ * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
+ * new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
*/
- private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController pcrc,
+ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
throws ServiceException {
- if (pcrc == null) {
- pcrc = new HBaseRpcControllerImpl();
+ BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
+ callMethod(md, hrc, param, returnType, ticket, isa, done);
+ Message val;
+ try {
+ val = done.get();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ if (hrc.failed()) {
+ throw new ServiceException(hrc.getFailed());
+ } else {
+ return val;
}
+ }
- Pair<Message, CellScanner> val;
- try {
- final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
- cs.setStartTime(EnvironmentEdgeManager.currentTime());
- val = call(pcrc, md, param, returnType, ticket, isa, cs);
- // Shove the results into controller so can be carried across the proxy/pb service void.
- pcrc.setCellScanner(val.getSecond());
-
- cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
- if (metrics != null) {
- metrics.updateRpc(md, param, cs);
+ /**
+ * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
+ * given host/port are reused.
+ */
+ private T getConnection(ConnectionId remoteId) throws IOException {
+ if (failedServers.isFailedServer(remoteId.getAddress())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not trying to connect to " + remoteId.address
+ + " this server is in the failed servers list");
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
+ throw new FailedServerException(
+ "This server is in the failed servers list: " + remoteId.address);
+ }
+ T conn;
+ synchronized (connections) {
+ if (!running) {
+ throw new StoppedRpcClientException();
}
- return val.getFirst();
- } catch (Throwable e) {
- throw new ServiceException(e);
+ conn = connections.get(remoteId);
+ if (conn == null) {
+ conn = createConnection(remoteId);
+ connections.put(remoteId, conn);
+ }
+ conn.setLastTouched(EnvironmentEdgeManager.currentTime());
}
+ return conn;
}
/**
- * Make a call, passing <code>param</code>, to the IPC server running at
- * <code>address</code> which is servicing the <code>protocol</code> protocol,
- * with the <code>ticket</code> credentials, returning the value.
- * Throws exceptions if there are network problems or if the remote code
- * threw an exception.
- *
- * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
- * {@link UserProvider#getCurrent()} makes a new instance of User each time so
- * will be a
- * new Connection each time.
- * @return A pair with the Message response and the Cell data (if any).
- * @throws InterruptedException if call is interrupted
- * @throws java.io.IOException if transport failed
+ * Not connected.
*/
- protected abstract Pair<Message, CellScanner> call(HBaseRpcController pcrc,
- Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
- InetSocketAddress isa, MetricsConnection.CallStats callStats)
- throws IOException, InterruptedException;
+ protected abstract T createConnection(ConnectionId remoteId) throws IOException;
- @Override
- public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
- int defaultOperationTimeout) throws UnknownHostException {
- return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
+ private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
+ RpcCallback<Message> callback) {
+ call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
+ if (metrics != null) {
+ metrics.updateRpc(call.md, call.param, call.callStats);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
+ }
+ if (call.error != null) {
+ if (call.error instanceof RemoteException) {
+ call.error.fillInStackTrace();
+ hrc.setFailed(call.error);
+ } else {
+ hrc.setFailed(wrapException(addr, call.error));
+ }
+ callback.run(null);
+ } else {
+ hrc.setDone(call.cells);
+ callback.run(call.response);
+ }
+ }
+
+ private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
+ final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
+ final RpcCallback<Message> callback) {
+ final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+ cs.setStartTime(EnvironmentEdgeManager.currentTime());
+ Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
+ hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
+
+ @Override
+ public void run(Call call) {
+ onCallFinished(call, hrc, addr, callback);
+ }
+ }, cs);
+ ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
+ try {
+ T connection = getConnection(remoteId);
+ connection.sendRequest(call, hrc);
+ } catch (Exception e) {
+ call.setException(toIOE(e));
+ }
+ }
+
+ private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
+ InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
+ if (addr.isUnresolved()) {
+ throw new UnknownHostException("can not resolve " + sn.getServerName());
+ }
+ return addr;
}
/**
- * Configure a payload carrying controller
- * @param controller to configure
- * @param channelOperationTimeout timeout for operation
- * @return configured payload controller
+ * Interrupt the connections to the given ip:port server. This should be called if the server is
+ * known as actually dead. This will not prevent current operation to be retried, and, depending
+ * on their own behavior, they may retry on the same server. This can be a feature, for example at
+ * startup. In any case, they're likely to get connection refused (if the process died) or no
+ * route to host: i.e. their next retries should be faster and with a safe exception.
*/
- static HBaseRpcController configurePayloadCarryingRpcController(
- RpcController controller, int channelOperationTimeout) {
- HBaseRpcController pcrc;
- if (controller != null && controller instanceof HBaseRpcController) {
- pcrc = (HBaseRpcController) controller;
- if (!pcrc.hasCallTimeout()) {
- pcrc.setCallTimeout(channelOperationTimeout);
+ @Override
+ public void cancelConnections(ServerName sn) {
+ synchronized (connections) {
+ for (T connection : connections.values()) {
+ ConnectionId remoteId = connection.remoteId();
+ if (remoteId.address.getPort() == sn.getPort()
+ && remoteId.address.getHostName().equals(sn.getHostname())) {
+ LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
+ + connection.remoteId);
+ connection.shutdown();
+ }
}
- } else {
- pcrc = new HBaseRpcControllerImpl();
- pcrc.setCallTimeout(channelOperationTimeout);
}
- return pcrc;
+ }
+
+ protected abstract void closeInternal();
+
+ @Override
+ public void close() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping rpc client");
+ }
+ Collection<T> connToClose;
+ synchronized (connections) {
+ if (!running) {
+ return;
+ }
+ running = false;
+ connToClose = connections.values();
+ connections.clear();
+ }
+ cleanupIdleConnectionTask.cancel(true);
+ for (T conn : connToClose) {
+ conn.shutdown();
+ }
+ closeInternal();
+ }
+
+ @Override
+ public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
+ int rpcTimeout) throws UnknownHostException {
+ return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
+ }
+
+ @Override
+ public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
+ throws UnknownHostException {
+ return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
+ }
+
+ private static class AbstractRpcChannel {
+
+ protected final InetSocketAddress addr;
+
+ protected final AbstractRpcClient<?> rpcClient;
+
+ protected final User ticket;
+
+ protected final int rpcTimeout;
+
+ protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+ User ticket, int rpcTimeout) {
+ this.addr = addr;
+ this.rpcClient = rpcClient;
+ this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
+ }
+
+ /**
+ * Configure an rpc controller
+ * @param controller to configure
+ * @return configured rpc controller
+ */
+ protected HBaseRpcController configureRpcController(RpcController controller) {
+ HBaseRpcController hrc;
+ // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
+ // side. And now we may use ServerRpcController.
+ if (controller != null && controller instanceof HBaseRpcController) {
+ hrc = (HBaseRpcController) controller;
+ if (!hrc.hasCallTimeout()) {
+ hrc.setCallTimeout(rpcTimeout);
+ }
+ } else {
+ hrc = new HBaseRpcControllerImpl();
+ hrc.setCallTimeout(rpcTimeout);
+ }
+ return hrc;
+ }
}
/**
* Blocking rpc channel that goes via hbase rpc.
*/
@VisibleForTesting
- public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
- private final InetSocketAddress isa;
- private final AbstractRpcClient rpcClient;
- private final User ticket;
- private final int channelOperationTimeout;
+ public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
+ implements BlockingRpcChannel {
- /**
- * @param channelOperationTimeout - the default timeout when no timeout is given
- */
- protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
- final ServerName sn, final User ticket, int channelOperationTimeout)
- throws UnknownHostException {
- this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
- if (this.isa.isUnresolved()) {
- throw new UnknownHostException(sn.getHostname());
- }
- this.rpcClient = rpcClient;
- this.ticket = ticket;
- this.channelOperationTimeout = channelOperationTimeout;
+ protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
+ InetSocketAddress addr, User ticket, int rpcTimeout) {
+ super(rpcClient, addr, ticket, rpcTimeout);
}
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
- HBaseRpcController pcrc = configurePayloadCarryingRpcController(
- controller,
- channelOperationTimeout);
+ return rpcClient.callBlockingMethod(md, configureRpcController(controller),
+ param, returnType, ticket, addr);
+ }
+ }
- return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
+ /**
+ * Async rpc channel that goes via hbase rpc.
+ */
+ public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel {
+
+ protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+ User ticket, int rpcTimeout) throws UnknownHostException {
+ super(rpcClient, addr, ticket, rpcTimeout);
+ }
+
+ @Override
+ public void callMethod(MethodDescriptor md, RpcController controller, Message param,
+ Message returnType, RpcCallback<Message> done) {
+ // This method does not throw any exceptions, so the caller must provide a
+ // HBaseRpcController which is used to pass the exceptions.
+ this.rpcClient.callMethod(md,
+ configureRpcController(Preconditions.checkNotNull(controller,
+ "RpcController can not be null for async rpc call")),
+ param, returnType, ticket, addr, done);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
deleted file mode 100644
index 33536df..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-
-import io.netty.util.concurrent.DefaultPromise;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ExceptionUtil;
-import org.apache.hadoop.ipc.RemoteException;
-
-/**
- * Represents an Async Hbase call and its response.
- *
- * Responses are passed on to its given doneHandler and failures to the rpcController
- *
- * @param <T> Type of message returned
- * @param <M> Message returned in communication to be converted
- */
-@InterfaceAudience.Private
-public class AsyncCall<M extends Message, T> extends DefaultPromise<T> {
- private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
-
- final int id;
-
- private final AsyncRpcChannel channel;
-
- final Descriptors.MethodDescriptor method;
- final Message param;
- final Message responseDefaultType;
-
- private final MessageConverter<M,T> messageConverter;
- private final IOExceptionConverter exceptionConverter;
-
- final long rpcTimeout;
-
- // For only the request
- private final CellScanner cellScanner;
- private final int priority;
-
- final MetricsConnection clientMetrics;
- final MetricsConnection.CallStats callStats;
-
- /**
- * Constructor
- *
- * @param channel which initiated call
- * @param connectId connection id
- * @param md the method descriptor
- * @param param parameters to send to Server
- * @param cellScanner cellScanner containing cells to send as request
- * @param responseDefaultType the default response type
- * @param messageConverter converts the messages to what is the expected output
- * @param exceptionConverter converts exceptions to expected format. Can be null
- * @param rpcTimeout timeout for this call in ms
- * @param priority for this request
- * @param metrics MetricsConnection to which the metrics are stored for this request
- */
- public AsyncCall(AsyncRpcChannel channel, int connectId, Descriptors.MethodDescriptor
- md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
- messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
- MetricsConnection metrics) {
- super(channel.getEventExecutor());
- this.channel = channel;
-
- this.id = connectId;
-
- this.method = md;
- this.param = param;
- this.responseDefaultType = responseDefaultType;
-
- this.messageConverter = messageConverter;
- this.exceptionConverter = exceptionConverter;
-
- this.rpcTimeout = rpcTimeout;
-
- this.priority = priority;
- this.cellScanner = cellScanner;
-
- this.callStats = MetricsConnection.newCallStats();
- callStats.setStartTime(EnvironmentEdgeManager.currentTime());
-
- this.clientMetrics = metrics;
- }
-
- /**
- * Get the start time
- *
- * @return start time for the call
- */
- public long getStartTime() {
- return this.callStats.getStartTime();
- }
-
- @Override
- public String toString() {
- return "callId=" + this.id + ", method=" + this.method.getName() +
- ", rpcTimeout=" + this.rpcTimeout + ", param {" +
- (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
- }
-
- /**
- * Set success with a cellBlockScanner
- *
- * @param value to set
- * @param cellBlockScanner to set
- */
- public void setSuccess(M value, CellScanner cellBlockScanner) {
- callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - callStats.getStartTime());
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Call: " + method.getName() + ", callTime: " + callStats.getCallTimeMs() + "ms");
- }
-
- if (clientMetrics != null) {
- clientMetrics.updateRpc(method, param, callStats);
- }
-
- try {
- this.setSuccess(
- this.messageConverter.convert(value, cellBlockScanner)
- );
- } catch (IOException e) {
- this.setFailed(e);
- }
- }
-
- /**
- * Set failed
- *
- * @param exception to set
- */
- public void setFailed(IOException exception) {
- if (ExceptionUtil.isInterrupt(exception)) {
- exception = ExceptionUtil.asInterrupt(exception);
- }
- if (exception instanceof RemoteException) {
- exception = ((RemoteException) exception).unwrapRemoteException();
- }
-
- if (this.exceptionConverter != null) {
- exception = this.exceptionConverter.convert(exception);
- }
-
- this.setFailure(exception);
- }
-
- /**
- * Get the rpc timeout
- *
- * @return current timeout for this call
- */
- public long getRpcTimeout() {
- return rpcTimeout;
- }
-
-
- /**
- * @return Priority for this call
- */
- public int getPriority() {
- return priority;
- }
-
- /**
- * Get the cellScanner for this request.
- * @return CellScanner
- */
- public CellScanner cellScanner() {
- return cellScanner;
- }
-
- @Override
- public boolean cancel(boolean mayInterupt){
- this.channel.removePendingCall(this.id);
- return super.cancel(mayInterupt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
deleted file mode 100644
index 2ec5adc..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ /dev/null
@@ -1,768 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.sasl.SaslException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslClientHandler;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-
-/**
- * Netty RPC channel
- */
-@InterfaceAudience.Private
-public class AsyncRpcChannel {
- private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
-
- private static final int MAX_SASL_RETRIES = 5;
-
- protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
- = new HashMap<>();
-
- static {
- TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
- new AuthenticationTokenSelector());
- }
-
- final AsyncRpcClient client;
-
- // Contains the channel to work with.
- // Only exists when connected
- private Channel channel;
-
- String name;
- final User ticket;
- final String serviceName;
- final InetSocketAddress address;
-
- private int failureCounter = 0;
-
- boolean useSasl;
- AuthMethod authMethod;
- private int reloginMaxBackoff;
- private Token<? extends TokenIdentifier> token;
- private String serverPrincipal;
-
- // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
- private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
- private boolean connected = false;
- private boolean closed = false;
-
- private Timeout cleanupTimer;
-
- private final TimerTask timeoutTask = new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- cleanupCalls();
- }
- };
-
- /**
- * Constructor for netty RPC channel
- * @param bootstrap to construct channel on
- * @param client to connect with
- * @param ticket of user which uses connection
- * @param serviceName name of service to connect to
- * @param address to connect to
- */
- public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
- String serviceName, InetSocketAddress address) {
- this.client = client;
-
- this.ticket = ticket;
- this.serviceName = serviceName;
- this.address = address;
-
- this.channel = connect(bootstrap).channel();
-
- name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
- + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
- }
-
- /**
- * Connect to channel
- * @param bootstrap to connect to
- * @return future of connection
- */
- private ChannelFuture connect(final Bootstrap bootstrap) {
- return bootstrap.remoteAddress(address).connect()
- .addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(final ChannelFuture f) throws Exception {
- if (!f.isSuccess()) {
- retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
- return;
- }
- channel = f.channel();
-
- setupAuthorization();
-
- ByteBuf b = channel.alloc().directBuffer(6);
- createPreamble(b, authMethod);
- channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- if (useSasl) {
- UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
- if (authMethod == AuthMethod.KERBEROS) {
- if (ticket != null && ticket.getRealUser() != null) {
- ticket = ticket.getRealUser();
- }
- }
- SaslClientHandler saslHandler;
- if (ticket == null) {
- throw new FatalConnectionException("ticket/user is null");
- }
- final UserGroupInformation realTicket = ticket;
- saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
- @Override
- public SaslClientHandler run() throws IOException {
- return getSaslHandler(realTicket, bootstrap);
- }
- });
- if (saslHandler != null) {
- // Sasl connect is successful. Let's set up Sasl channel handler
- channel.pipeline().addFirst(saslHandler);
- } else {
- // fall back to simple auth because server told us so.
- authMethod = AuthMethod.SIMPLE;
- useSasl = false;
- }
- } else {
- startHBaseConnection(f.channel());
- }
- }
- });
- }
-
- /**
- * Start HBase connection
- * @param ch channel to start connection on
- */
- private void startHBaseConnection(Channel ch) {
- ch.pipeline().addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
- ch.pipeline().addLast(new AsyncServerResponseHandler(this));
- try {
- writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- close(future.cause());
- return;
- }
- List<AsyncCall> callsToWrite;
- synchronized (pendingCalls) {
- connected = true;
- callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
- }
- for (AsyncCall call : callsToWrite) {
- writeRequest(call);
- }
- }
- });
- } catch (IOException e) {
- close(e);
- }
- }
-
- private void startConnectionWithEncryption(Channel ch) {
- // for rpc encryption, the order of ChannelInboundHandler should be:
- // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
- // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
- // SaslClientHandler will handler this
- ch.pipeline().addFirst("beforeUnwrapDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
- ch.pipeline().addLast("afterUnwrapDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
- ch.pipeline().addLast(new AsyncServerResponseHandler(this));
- List<AsyncCall> callsToWrite;
- synchronized (pendingCalls) {
- connected = true;
- callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
- }
- for (AsyncCall call : callsToWrite) {
- writeRequest(call);
- }
- }
-
- /**
- * Get SASL handler
- * @param bootstrap to reconnect to
- * @return new SASL handler
- * @throws java.io.IOException if handler failed to create
- */
- private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
- final Bootstrap bootstrap) throws IOException {
- return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
- client.fallbackAllowed,
- client.conf.get("hbase.rpc.protection",
- SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
- getChannelHeaderBytes(authMethod),
- new SaslClientHandler.SaslExceptionHandler() {
- @Override
- public void handle(int retryCount, Random random, Throwable cause) {
- try {
- // Handle Sasl failure. Try to potentially get new credentials
- handleSaslConnectionFailure(retryCount, cause, realTicket);
-
- retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
- cause);
- } catch (IOException | InterruptedException e) {
- close(e);
- }
- }
- }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
- @Override
- public void onSuccess(Channel channel) {
- startHBaseConnection(channel);
- }
-
- @Override
- public void onSaslProtectionSucess(Channel channel) {
- startConnectionWithEncryption(channel);
- }
- });
- }
-
- /**
- * Retry to connect or close
- * @param bootstrap to connect with
- * @param failureCount failure count
- * @param e exception of fail
- */
- private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
- Throwable e) {
- if (failureCount < client.maxRetries) {
- client.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- connect(bootstrap);
- }
- }, timeout, TimeUnit.MILLISECONDS);
- } else {
- client.failedServers.addToFailedServers(address);
- close(e);
- }
- }
-
- /**
- * Calls method on channel
- * @param method to call
- * @param request to send
- * @param cellScanner with cells to send
- * @param responsePrototype to construct response with
- * @param rpcTimeout timeout for request
- * @param priority for request
- * @return Promise for the response Message
- */
- public <R extends Message, O> io.netty.util.concurrent.Promise<O> callMethod(
- final Descriptors.MethodDescriptor method,
- final Message request,final CellScanner cellScanner,
- R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
- exceptionConverter, long rpcTimeout, int priority) {
- final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
- method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
- rpcTimeout, priority, client.metrics);
-
- synchronized (pendingCalls) {
- if (closed) {
- call.setFailure(new ConnectException());
- return call;
- }
- pendingCalls.put(call.id, call);
- // Add timeout for cleanup if none is present
- if (cleanupTimer == null && call.getRpcTimeout() > 0) {
- cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
- }
- if (!connected) {
- return call;
- }
- }
- writeRequest(call);
- return call;
- }
-
- public EventLoop getEventExecutor() {
- return this.channel.eventLoop();
- }
-
- AsyncCall removePendingCall(int id) {
- synchronized (pendingCalls) {
- return pendingCalls.remove(id);
- }
- }
-
- /**
- * Write the channel header
- * @param channel to write to
- * @return future of write
- * @throws java.io.IOException on failure to write
- */
- private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
- RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
- int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
- ByteBuf b = channel.alloc().directBuffer(totalSize);
-
- b.writeInt(header.getSerializedSize());
- b.writeBytes(header.toByteArray());
-
- return channel.writeAndFlush(b);
- }
-
- private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
- RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
- ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
- b.putInt(header.getSerializedSize());
- b.put(header.toByteArray());
- return b.array();
- }
-
- private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
- RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
- .setServiceName(serviceName);
-
- RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
- if (userInfoPB != null) {
- headerBuilder.setUserInfo(userInfoPB);
- }
-
- if (client.codec != null) {
- headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
- }
- if (client.compressor != null) {
- headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
- }
-
- headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
- return headerBuilder.build();
- }
-
- /**
- * Write request to channel
- * @param call to write
- */
- private void writeRequest(final AsyncCall call) {
- try {
- final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
- .newBuilder();
- requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
- .setRequestParam(call.param != null);
-
- if (Trace.isTracing()) {
- Span s = Trace.currentSpan();
- requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
- .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
- }
-
- ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
- if (cellBlock != null) {
- final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
- .newBuilder();
- cellBlockBuilder.setLength(cellBlock.limit());
- requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
- }
- // Only pass priority if there one. Let zero be same as no priority.
- if (call.getPriority() != HBaseRpcController.PRIORITY_UNSET) {
- requestHeaderBuilder.setPriority(call.getPriority());
- }
- requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
- Integer.MAX_VALUE : (int)call.rpcTimeout);
-
- RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
-
- int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
- if (cellBlock != null) {
- totalSize += cellBlock.remaining();
- }
-
- ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
- try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
- call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
- }
-
- channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
- } catch (IOException e) {
- close(e);
- }
- }
-
- /**
- * Set up server authorization
- * @throws java.io.IOException if auth setup failed
- */
- private void setupAuthorization() throws IOException {
- SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
- this.useSasl = client.userProvider.isHBaseSecurityEnabled();
-
- this.token = null;
- if (useSasl && securityInfo != null) {
- AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
- if (tokenKind != null) {
- TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
- if (tokenSelector != null) {
- token = tokenSelector.selectToken(new Text(client.clusterId),
- ticket.getUGI().getTokens());
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("No token selector found for type " + tokenKind);
- }
- }
- String serverKey = securityInfo.getServerPrincipal();
- if (serverKey == null) {
- throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
- }
- this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
- address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
- if (LOG.isDebugEnabled()) {
- LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
- + serverPrincipal);
- }
- }
-
- if (!useSasl) {
- authMethod = AuthMethod.SIMPLE;
- } else if (token != null) {
- authMethod = AuthMethod.DIGEST;
- } else {
- authMethod = AuthMethod.KERBEROS;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
- }
- reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
- }
-
- /**
- * Build the user information
- * @param ugi User Group Information
- * @param authMethod Authorization method
- * @return UserInformation protobuf
- */
- private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
- if (ugi == null || authMethod == AuthMethod.DIGEST) {
- // Don't send user for token auth
- return null;
- }
- RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
- if (authMethod == AuthMethod.KERBEROS) {
- // Send effective user for Kerberos auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- } else if (authMethod == AuthMethod.SIMPLE) {
- // Send both effective user and real user for simple auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- if (ugi.getRealUser() != null) {
- userInfoPB.setRealUser(ugi.getRealUser().getUserName());
- }
- }
- return userInfoPB.build();
- }
-
- /**
- * Create connection preamble
- * @param byteBuf to write to
- * @param authMethod to write
- */
- private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
- byteBuf.writeBytes(HConstants.RPC_HEADER);
- byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
- byteBuf.writeByte(authMethod.code);
- }
-
- private void close0(Throwable e) {
- List<AsyncCall> toCleanup;
- synchronized (pendingCalls) {
- if (closed) {
- return;
- }
- closed = true;
- toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
- pendingCalls.clear();
- }
- IOException closeException = null;
- if (e != null) {
- if (e instanceof IOException) {
- closeException = (IOException) e;
- } else {
- closeException = new IOException(e);
- }
- }
- // log the info
- if (LOG.isDebugEnabled() && closeException != null) {
- LOG.debug(name + ": closing ipc connection to " + address, closeException);
- }
- if (cleanupTimer != null) {
- cleanupTimer.cancel();
- cleanupTimer = null;
- }
- for (AsyncCall call : toCleanup) {
- call.setFailed(closeException != null ? closeException
- : new ConnectionClosingException(
- "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
- }
- channel.disconnect().addListener(ChannelFutureListener.CLOSE);
- if (LOG.isDebugEnabled()) {
- LOG.debug(name + ": closed");
- }
- }
-
- /**
- * Close connection
- * @param e exception on close
- */
- public void close(final Throwable e) {
- client.removeConnection(this);
-
- // Move closing from the requesting thread to the channel thread
- if (channel.eventLoop().inEventLoop()) {
- close0(e);
- } else {
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- close0(e);
- }
- });
- }
- }
-
- /**
- * Clean up calls.
- */
- private void cleanupCalls() {
- List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
- long currentTime = EnvironmentEdgeManager.currentTime();
- long nextCleanupTaskDelay = -1L;
- synchronized (pendingCalls) {
- for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
- AsyncCall call = iter.next();
- long timeout = call.getRpcTimeout();
- if (timeout > 0) {
- if (currentTime - call.getStartTime() >= timeout) {
- iter.remove();
- toCleanup.add(call);
- } else {
- if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
- nextCleanupTaskDelay = timeout;
- }
- }
- }
- }
- if (nextCleanupTaskDelay > 0) {
- cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
- } else {
- cleanupTimer = null;
- }
- }
- for (AsyncCall call : toCleanup) {
- call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
- + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
- }
- }
-
- /**
- * Check if the connection is alive
- * @return true if alive
- */
- public boolean isAlive() {
- return channel.isOpen();
- }
-
- public InetSocketAddress getAddress() {
- return this.address;
- }
-
- /**
- * Check if user should authenticate over Kerberos
- * @return true if should be authenticated over Kerberos
- * @throws java.io.IOException on failure of check
- */
- private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- UserGroupInformation realUser = currentUser.getRealUser();
- return authMethod == AuthMethod.KERBEROS && loginUser != null &&
- // Make sure user logged in using Kerberos either keytab or TGT
- loginUser.hasKerberosCredentials() &&
- // relogin only in case it is the login user (e.g. JT)
- // or superuser (like oozie).
- (loginUser.equals(currentUser) || loginUser.equals(realUser));
- }
-
- /**
- * If multiple clients with the same principal try to connect to the same server at the same time,
- * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
- * work around this, what is done is that the client backs off randomly and tries to initiate the
- * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
- * attempted.
- * <p>
- * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
- * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
- * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
- * underlying authentication implementation, so there is no retry from other high level (for eg,
- * HCM or HBaseAdmin).
- * </p>
- * @param currRetries retry count
- * @param ex exception describing fail
- * @param user which is trying to connect
- * @throws java.io.IOException if IO fail
- * @throws InterruptedException if thread is interrupted
- */
- private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
- final UserGroupInformation user) throws IOException, InterruptedException {
- user.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws IOException, InterruptedException {
- if (shouldAuthenticateOverKrb()) {
- if (currRetries < MAX_SASL_RETRIES) {
- LOG.debug("Exception encountered while connecting to the server : " + ex);
- // try re-login
- if (UserGroupInformation.isLoginKeytabBased()) {
- UserGroupInformation.getLoginUser().reloginFromKeytab();
- } else {
- UserGroupInformation.getLoginUser().reloginFromTicketCache();
- }
-
- // Should reconnect
- return null;
- } else {
- String msg = "Couldn't setup connection for "
- + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
- LOG.warn(msg, ex);
- throw (IOException) new IOException(msg).initCause(ex);
- }
- } else {
- LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
- }
- if (ex instanceof RemoteException) {
- throw (RemoteException) ex;
- }
- if (ex instanceof SaslException) {
- String msg = "SASL authentication failed."
- + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
- LOG.fatal(msg, ex);
- throw new RuntimeException(msg, ex);
- }
- throw new IOException(ex);
- }
- });
- }
-
- public int getConnectionHashCode() {
- return ConnectionId.hashCode(ticket, serviceName, address);
- }
-
- @Override
- public int hashCode() {
- return getConnectionHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof AsyncRpcChannel) {
- AsyncRpcChannel channel = (AsyncRpcChannel) obj;
- return channel.hashCode() == obj.hashCode();
- }
- return false;
- }
-
- @Override
- public String toString() {
- return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
- }
-
- /**
- * Listens to call writes and fails if write failed
- */
- private static final class CallWriteListener implements ChannelFutureListener {
- private final AsyncRpcChannel rpcChannel;
- private final int id;
-
- public CallWriteListener(AsyncRpcChannel asyncRpcChannelImpl, int id) {
- this.rpcChannel = asyncRpcChannelImpl;
- this.id = id;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- AsyncCall call = rpcChannel.removePendingCall(id);
- if (call != null) {
- if (future.cause() instanceof IOException) {
- call.setFailed((IOException) future.cause());
- } else {
- call.setFailed(new IOException(future.cause()));
- }
- }
- }
- }
- }
-}