You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/16 15:01:02 UTC
[09/10] hbase git commit: HBASE-16584 Backport the new ipc
implementation in HBASE-16432 to branch-1
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 3740b7f..634c101 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -18,62 +18,108 @@
package org.apache.hadoop.hbase.ipc;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import io.netty.util.HashedWheelTimer;
+
import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
/**
* Provides the basics for a RpcClient implementation like configuration and Logging.
+ * <p>
+ * Locking schema of the current IPC implementation
+ * <ul>
+ * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
+ * connection.</li>
+ * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
+ * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
+ * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
+ * of how to deal with cancel.</li>
+ * <li>For connection implementation, the construction of a connection should be as fast as possible
+ * because the creation is protected under a lock. Connect to remote side when needed. There is no
+ * forced locking schema for a connection implementation.</li>
+ * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
+ * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
+ * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
+ * of the callbacks are free to hold any lock.</li>
+ * </ul>
*/
@InterfaceAudience.Private
-public abstract class AbstractRpcClient implements RpcClient {
+public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
// Log level is being changed in tests
public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
+ protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
+ Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
+
+ private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
+ .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
+
+ protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
+
+ static {
+ TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
+ }
+
+ protected boolean running = true; // if client runs
+
protected final Configuration conf;
- protected String clusterId;
+ protected final String clusterId;
protected final SocketAddress localAddr;
protected final MetricsConnection metrics;
- protected UserProvider userProvider;
- protected final IPCUtil ipcUtil;
+ protected final UserProvider userProvider;
+ protected final CellBlockBuilder cellBlockBuilder;
protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
// time (in ms), it will be closed at any moment.
- protected final int maxRetries; //the max. no. of retries for socket connections
+ protected final int maxRetries; // the max. no. of retries for socket connections
protected final long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -81,10 +127,18 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final CompressionCodec compressor;
protected final boolean fallbackAllowed;
+ protected final FailedServers failedServers;
+
protected final int connectTO;
protected final int readTO;
protected final int writeTO;
+ protected final PoolMap<ConnectionId, T> connections;
+
+ private final AtomicInteger callIdCnt = new AtomicInteger(0);
+
+ private final ScheduledFuture<?> cleanupIdleConnectionTask;
+
private int maxConcurrentCallsPerServer;
private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
@@ -97,7 +151,6 @@ public abstract class AbstractRpcClient implements RpcClient {
/**
* Construct an IPC client for the cluster <code>clusterId</code>
- *
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
@@ -110,17 +163,18 @@ public abstract class AbstractRpcClient implements RpcClient {
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
- this.ipcUtil = new IPCUtil(conf);
+ this.cellBlockBuilder = new CellBlockBuilder(conf);
this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
this.conf = conf;
this.codec = getCodec();
this.compressor = getCompressor(conf);
this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
- IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ this.failedServers = new FailedServers(conf);
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
@@ -129,25 +183,47 @@ public abstract class AbstractRpcClient implements RpcClient {
HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
- // login the server principal (if using secure Hadoop)
+ this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
+
+ this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ cleanupIdleConnections();
+ }
+ }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
- ", tcpKeepAlive=" + this.tcpKeepAlive +
- ", tcpNoDelay=" + this.tcpNoDelay +
- ", connectTO=" + this.connectTO +
- ", readTO=" + this.readTO +
- ", writeTO=" + this.writeTO +
- ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
- ", maxRetries=" + this.maxRetries +
- ", fallbackAllowed=" + this.fallbackAllowed +
- ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+ LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
+ + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
+ + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
+ + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
+ + this.fallbackAllowed + ", bind address="
+ + (this.localAddr != null ? this.localAddr : "null"));
+ }
+ }
+
+ private void cleanupIdleConnections() {
+ long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
+ synchronized (connections) {
+ for (T conn : connections.values()) {
+ // remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
+ // connection itself has already shutdown. The latter check is because that we may still
+ // have some pending calls on connection so we should not shutdown the connection outside.
+ // The connection itself will disconnect if there is no pending call for maxIdleTime.
+ if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
+ LOG.info("Cleanup idle connection to " + conn.remoteId().address);
+ connections.removeValue(conn.remoteId(), conn);
+ conn.cleanupConnection();
+ }
+ }
}
}
@VisibleForTesting
public static String getDefaultCodec(final Configuration c) {
// If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
- // Configuration will complain -- then no default codec (and we'll pb everything). Else
+ // Configuration will complain -- then no default codec (and we'll pb everything). Else
// default is KeyValueCodec
return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
}
@@ -160,9 +236,11 @@ public abstract class AbstractRpcClient implements RpcClient {
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
- if (className == null || className.length() == 0) return null;
+ if (className == null || className.length() == 0) {
+ return null;
+ }
try {
- return (Codec)Class.forName(className).newInstance();
+ return (Codec) Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting codec " + className, e);
}
@@ -173,6 +251,12 @@ public abstract class AbstractRpcClient implements RpcClient {
return this.codec != null;
}
+ // for writing tests that want to throw exception when connecting.
+ @VisibleForTesting
+ boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @param conf configuration
@@ -180,142 +264,289 @@ public abstract class AbstractRpcClient implements RpcClient {
*/
private static CompressionCodec getCompressor(final Configuration conf) {
String className = conf.get("hbase.client.rpc.compressor", null);
- if (className == null || className.isEmpty()) return null;
+ if (className == null || className.isEmpty()) {
+ return null;
+ }
try {
- return (CompressionCodec)Class.forName(className).newInstance();
+ return (CompressionCodec) Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting compressor " + className, e);
}
}
/**
- * Return the pool type specified in the configuration, which must be set to
- * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
- * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
- * otherwise default to the former.
- *
- * For applications with many user threads, use a small round-robin pool. For
- * applications with few user threads, you may want to try using a
- * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
- * instances should not exceed the operating system's hard limit on the number of
- * connections.
- *
+ * Return the pool type specified in the configuration, which must be set to either
+ * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+ * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
+ * former. For applications with many user threads, use a small round-robin pool. For applications
+ * with few user threads, you may want to try using a thread-local pool. In any case, the number
+ * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
+ * system's hard limit on the number of connections.
* @param config configuration
* @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
*/
- protected static PoolMap.PoolType getPoolType(Configuration config) {
- return PoolMap.PoolType
- .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
- PoolMap.PoolType.ThreadLocal);
+ private static PoolMap.PoolType getPoolType(Configuration config) {
+ return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
+ PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal);
}
/**
- * Return the pool size specified in the configuration, which is applicable only if
- * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
- *
+ * Return the pool size specified in the configuration, which is applicable only if the pool type
+ * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
* @param config configuration
* @return the maximum pool size
*/
- protected static int getPoolSize(Configuration config) {
+ private static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}
+ private int nextCallId() {
+ int id, next;
+ do {
+ id = callIdCnt.get();
+ next = id < Integer.MAX_VALUE ? id + 1 : 0;
+ } while (!callIdCnt.compareAndSet(id, next));
+ return id;
+ }
+
/**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception.
- *
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
- * {@link UserProvider#getCurrent()} makes a new instance of User each time so
- * will be a
- * new Connection each time.
+ * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
+ * new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
*/
- Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
+ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
throws ServiceException {
- if (pcrc == null) {
- pcrc = new PayloadCarryingRpcController();
+ BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
+ callMethod(md, hrc, param, returnType, ticket, isa, done);
+ Message val;
+ try {
+ val = done.get();
+ } catch (IOException e) {
+ throw new ServiceException(e);
}
+ if (hrc.failed()) {
+ throw new ServiceException(hrc.getFailed());
+ } else {
+ return val;
+ }
+ }
- Pair<Message, CellScanner> val;
- AtomicInteger counter = concurrentCounterCache.getUnchecked(isa);
- int count = counter.incrementAndGet();
- try {
- if (count > maxConcurrentCallsPerServer) {
- throw new ServerTooBusyException(isa, count);
+ /**
+ * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
+ * given host/port are reused.
+ */
+ private T getConnection(ConnectionId remoteId) throws IOException {
+ if (failedServers.isFailedServer(remoteId.getAddress())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not trying to connect to " + remoteId.address
+ + " this server is in the failed servers list");
}
- final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
- cs.setStartTime(EnvironmentEdgeManager.currentTime());
- val = call(pcrc, md, param, returnType, ticket, isa, cs);
- // Shove the results into controller so can be carried across the proxy/pb service void.
- pcrc.setCellScanner(val.getSecond());
-
- cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
- if (metrics != null) {
- metrics.updateRpc(md, param, cs);
+ throw new FailedServerException(
+ "This server is in the failed servers list: " + remoteId.address);
+ }
+ T conn;
+ synchronized (connections) {
+ if (!running) {
+ throw new StoppedRpcClientException();
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
+ conn = connections.get(remoteId);
+ if (conn == null) {
+ conn = createConnection(remoteId);
+ connections.put(remoteId, conn);
}
- return val.getFirst();
- } catch (Throwable e) {
- throw new ServiceException(e);
- } finally {
- counter.decrementAndGet();
+ conn.setLastTouched(EnvironmentEdgeManager.currentTime());
}
+ return conn;
}
/**
- * Make a call, passing <code>param</code>, to the IPC server running at
- * <code>address</code> which is servicing the <code>protocol</code> protocol,
- * with the <code>ticket</code> credentials, returning the value.
- * Throws exceptions if there are network problems or if the remote code
- * threw an exception.
- *
- * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
- * {@link UserProvider#getCurrent()} makes a new instance of User each time so
- * will be a
- * new Connection each time.
- * @return A pair with the Message response and the Cell data (if any).
- * @throws InterruptedException
- * @throws java.io.IOException
+ * Not connected.
*/
- protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
- Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
- InetSocketAddress isa, MetricsConnection.CallStats callStats)
- throws IOException, InterruptedException;
+ protected abstract T createConnection(ConnectionId remoteId) throws IOException;
+
+ private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
+ RpcCallback<Message> callback) {
+ call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
+ if (metrics != null) {
+ metrics.updateRpc(call.md, call.param, call.callStats);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
+ }
+ if (call.error != null) {
+ if (call.error instanceof RemoteException) {
+ call.error.fillInStackTrace();
+ hrc.setFailed(call.error);
+ } else {
+ hrc.setFailed(wrapException(addr, call.error));
+ }
+ callback.run(null);
+ } else {
+ hrc.setDone(call.cells);
+ callback.run(call.response);
+ }
+ }
+ private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
+ final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
+ final RpcCallback<Message> callback) {
+ final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+ cs.setStartTime(EnvironmentEdgeManager.currentTime());
+ final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
+ Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
+ hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
+ @Override
+ public void run(Call call) {
+ counter.decrementAndGet();
+ onCallFinished(call, hrc, addr, callback);
+ }
+ }, cs);
+ ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
+ int count = counter.incrementAndGet();
+ try {
+ if (count > maxConcurrentCallsPerServer) {
+ throw new ServerTooBusyException(addr, count);
+ }
+ T connection = getConnection(remoteId);
+ connection.sendRequest(call, hrc);
+ } catch (Exception e) {
+ call.setException(toIOE(e));
+ }
+ }
+
+ private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
+ InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
+ if (addr.isUnresolved()) {
+ throw new UnknownHostException("can not resolve " + sn.getServerName());
+ }
+ return addr;
+ }
+
+ /**
+ * Interrupt the connections to the given ip:port server. This should be called if the server is
+ * known as actually dead. This will not prevent current operation to be retried, and, depending
+ * on their own behavior, they may retry on the same server. This can be a feature, for example at
+ * startup. In any case, they're likely to get connection refused (if the process died) or no
+ * route to host: i.e. their next retries should be faster and with a safe exception.
+ */
@Override
- public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
- int defaultOperationTimeout) throws UnknownHostException {
- return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
+ public void cancelConnections(ServerName sn) {
+ synchronized (connections) {
+ for (T connection : connections.values()) {
+ ConnectionId remoteId = connection.remoteId();
+ if (remoteId.address.getPort() == sn.getPort() &&
+ remoteId.address.getHostName().equals(sn.getHostname())) {
+ LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " +
+ connection.remoteId);
+ connections.removeValue(remoteId, connection);
+ connection.shutdown();
+ }
+ }
+ }
}
/**
- * Takes an Exception and the address we were trying to connect to and return an IOException with
- * the input exception as the cause. The new exception provides the stack trace of the place where
- * the exception is thrown and some extra diagnostics information. If the exception is
- * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
- * an IOException.
- * @param addr target address
- * @param exception the relevant exception
- * @return an exception to throw
+ * Configure an hbase rpccontroller
+ * @param controller to configure
+ * @param channelOperationTimeout timeout for operation
+ * @return configured controller
*/
- protected IOException wrapException(InetSocketAddress addr, Exception exception) {
- if (exception instanceof ConnectException) {
- // connection refused; include the host:port in the error
- return (ConnectException) new ConnectException("Call to " + addr
- + " failed on connection exception: " + exception).initCause(exception);
- } else if (exception instanceof SocketTimeoutException) {
- return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
- + " failed because " + exception).initCause(exception);
- } else if (exception instanceof ConnectionClosingException) {
- return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
- + " failed on local exception: " + exception).initCause(exception);
+ static HBaseRpcController configureHBaseRpcController(
+ RpcController controller, int channelOperationTimeout) {
+ HBaseRpcController hrc;
+ if (controller != null && controller instanceof HBaseRpcController) {
+ hrc = (HBaseRpcController) controller;
+ if (!hrc.hasCallTimeout()) {
+ hrc.setCallTimeout(channelOperationTimeout);
+ }
} else {
- return (IOException) new IOException("Call to " + addr + " failed on local exception: "
- + exception).initCause(exception);
+ hrc = new HBaseRpcControllerImpl();
+ hrc.setCallTimeout(channelOperationTimeout);
+ }
+ return hrc;
+ }
+
+ protected abstract void closeInternal();
+
+ @Override
+ public void close() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping rpc client");
+ }
+ Collection<T> connToClose;
+ synchronized (connections) {
+ if (!running) {
+ return;
+ }
+ running = false;
+ connToClose = connections.values();
+ connections.clear();
+ }
+ cleanupIdleConnectionTask.cancel(true);
+ for (T conn : connToClose) {
+ conn.shutdown();
+ }
+ closeInternal();
+ for (T conn : connToClose) {
+ conn.cleanupConnection();
+ }
+ }
+
+ @Override
+ public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
+ int rpcTimeout) throws UnknownHostException {
+ return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
+ }
+
+ @Override
+ public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
+ throws UnknownHostException {
+ return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
+ }
+
+ private static class AbstractRpcChannel {
+
+ protected final InetSocketAddress addr;
+
+ protected final AbstractRpcClient<?> rpcClient;
+
+ protected final User ticket;
+
+ protected final int rpcTimeout;
+
+ protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+ User ticket, int rpcTimeout) {
+ this.addr = addr;
+ this.rpcClient = rpcClient;
+ this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
+ }
+
+ /**
+ * Configure an rpc controller
+ * @param controller to configure
+ * @return configured rpc controller
+ */
+ protected HBaseRpcController configureRpcController(RpcController controller) {
+ HBaseRpcController hrc;
+ // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
+ // side. And now we may use ServerRpcController.
+ if (controller != null && controller instanceof HBaseRpcController) {
+ hrc = (HBaseRpcController) controller;
+ if (!hrc.hasCallTimeout()) {
+ hrc.setCallTimeout(rpcTimeout);
+ }
+ } else {
+ hrc = new HBaseRpcControllerImpl();
+ hrc.setCallTimeout(rpcTimeout);
+ }
+ return hrc;
}
}
@@ -323,42 +554,42 @@ public abstract class AbstractRpcClient implements RpcClient {
* Blocking rpc channel that goes via hbase rpc.
*/
@VisibleForTesting
- public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
- private final InetSocketAddress isa;
- private final AbstractRpcClient rpcClient;
- private final User ticket;
- private final int channelOperationTimeout;
+ public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
+ implements BlockingRpcChannel {
- /**
- * @param channelOperationTimeout - the default timeout when no timeout is given
- */
- protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
- final ServerName sn, final User ticket, int channelOperationTimeout)
- throws UnknownHostException {
- this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
- if (this.isa.isUnresolved()) {
- throw new UnknownHostException(sn.getHostname());
- }
- this.rpcClient = rpcClient;
- this.ticket = ticket;
- this.channelOperationTimeout = channelOperationTimeout;
+ protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
+ InetSocketAddress addr, User ticket, int rpcTimeout) {
+ super(rpcClient, addr, ticket, rpcTimeout);
}
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
- PayloadCarryingRpcController pcrc;
- if (controller != null && controller instanceof PayloadCarryingRpcController) {
- pcrc = (PayloadCarryingRpcController) controller;
- if (!pcrc.hasCallTimeout()) {
- pcrc.setCallTimeout(channelOperationTimeout);
- }
- } else {
- pcrc = new PayloadCarryingRpcController();
- pcrc.setCallTimeout(channelOperationTimeout);
- }
+ return rpcClient.callBlockingMethod(md, configureRpcController(controller),
+ param, returnType, ticket, addr);
+ }
+ }
+
+ /**
+ * Async rpc channel that goes via hbase rpc.
+ */
+ public static class RpcChannelImplementation extends AbstractRpcChannel implements
+ RpcChannel {
- return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
+ protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
+ User ticket, int rpcTimeout) throws UnknownHostException {
+ super(rpcClient, addr, ticket, rpcTimeout);
+ }
+
+ @Override
+ public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
+ Message param, Message returnType, RpcCallback<Message> done) {
+ // This method does not throw any exceptions, so the caller must provide a
+ // HBaseRpcController which is used to pass the exceptions.
+ this.rpcClient.callMethod(md,
+ configureRpcController(Preconditions.checkNotNull(controller,
+ "RpcController can not be null for async rpc call")),
+ param, returnType, ticket, addr, done);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
deleted file mode 100644
index a5da0dc..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import io.netty.channel.EventLoop;
-import io.netty.util.concurrent.DefaultPromise;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ExceptionUtil;
-import org.apache.hadoop.ipc.RemoteException;
-
-import java.io.IOException;
-
-/**
- * Represents an Async Hbase call and its response.
- *
- * Responses are passed on to its given doneHandler and failures to the rpcController
- */
-@InterfaceAudience.Private
-public class AsyncCall extends DefaultPromise<Message> {
- private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
-
- final int id;
-
- final Descriptors.MethodDescriptor method;
- final Message param;
- final PayloadCarryingRpcController controller;
- final Message responseDefaultType;
- final long startTime;
- final long rpcTimeout;
- final MetricsConnection.CallStats callStats;
-
- /**
- * Constructor
- *
- * @param eventLoop for call
- * @param connectId connection id
- * @param md the method descriptor
- * @param param parameters to send to Server
- * @param controller controller for response
- * @param responseDefaultType the default response type
- */
- public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
- param, PayloadCarryingRpcController controller, Message responseDefaultType,
- MetricsConnection.CallStats callStats) {
- super(eventLoop);
-
- this.id = connectId;
-
- this.method = md;
- this.param = param;
- this.controller = controller;
- this.responseDefaultType = responseDefaultType;
-
- this.startTime = EnvironmentEdgeManager.currentTime();
- this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
- this.callStats = callStats;
- }
-
- /**
- * Get the start time
- *
- * @return start time for the call
- */
- public long getStartTime() {
- return this.startTime;
- }
-
- @Override
- public String toString() {
- return "callId=" + this.id + ", method=" + this.method.getName() +
- ", rpcTimeout=" + this.rpcTimeout + ", param {" +
- (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
- }
-
- /**
- * Set success with a cellBlockScanner
- *
- * @param value to set
- * @param cellBlockScanner to set
- */
- public void setSuccess(Message value, CellScanner cellBlockScanner) {
- if (cellBlockScanner != null) {
- controller.setCellScanner(cellBlockScanner);
- }
-
- if (LOG.isTraceEnabled()) {
- long callTime = EnvironmentEdgeManager.currentTime() - startTime;
- LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
- }
-
- this.setSuccess(value);
- }
-
- /**
- * Set failed
- *
- * @param exception to set
- */
- public void setFailed(IOException exception) {
- if (ExceptionUtil.isInterrupt(exception)) {
- exception = ExceptionUtil.asInterrupt(exception);
- }
- if (exception instanceof RemoteException) {
- exception = ((RemoteException) exception).unwrapRemoteException();
- }
-
- this.setFailure(exception);
- }
-
- /**
- * Get the rpc timeout
- *
- * @return current timeout for this call
- */
- public long getRpcTimeout() {
- return rpcTimeout;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
deleted file mode 100644
index 878d8b8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ /dev/null
@@ -1,785 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.sasl.SaslException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslClientHandler;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
-/**
- * Netty RPC channel
- */
-@InterfaceAudience.Private
-public class AsyncRpcChannel {
- private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
-
- private static final int MAX_SASL_RETRIES = 5;
-
- protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
- = new HashMap<>();
-
- static {
- TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
- new AuthenticationTokenSelector());
- }
-
- final AsyncRpcClient client;
-
- // Contains the channel to work with.
- // Only exists when connected
- private Channel channel;
-
- String name;
- final User ticket;
- final String serviceName;
- final InetSocketAddress address;
-
- private int ioFailureCounter = 0;
- private int connectFailureCounter = 0;
-
- boolean useSasl;
- AuthMethod authMethod;
- private int reloginMaxBackoff;
- private Token<? extends TokenIdentifier> token;
- private String serverPrincipal;
-
- // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
- private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
- private boolean connected = false;
- private boolean closed = false;
-
- private Timeout cleanupTimer;
-
- private final TimerTask timeoutTask = new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- cleanupCalls();
- }
- };
-
- /**
- * Constructor for netty RPC channel
- * @param bootstrap to construct channel on
- * @param client to connect with
- * @param ticket of user which uses connection
- * @param serviceName name of service to connect to
- * @param address to connect to
- */
- public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
- String serviceName, InetSocketAddress address) {
- this.client = client;
-
- this.ticket = ticket;
- this.serviceName = serviceName;
- this.address = address;
-
- this.channel = connect(bootstrap).channel();
-
- name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
- + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
- }
-
- /**
- * Connect to channel
- * @param bootstrap to connect to
- * @return future of connection
- */
- private ChannelFuture connect(final Bootstrap bootstrap) {
- return bootstrap.remoteAddress(address).connect()
- .addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(final ChannelFuture f) throws Exception {
- if (!f.isSuccess()) {
- if (f.cause() instanceof SocketException) {
- retryOrClose(bootstrap, connectFailureCounter++, f.cause());
- } else {
- retryOrClose(bootstrap, ioFailureCounter++, f.cause());
- }
- return;
- }
- channel = f.channel();
-
- setupAuthorization();
-
- ByteBuf b = channel.alloc().directBuffer(6);
- createPreamble(b, authMethod);
- channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- if (useSasl) {
- UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
- if (authMethod == AuthMethod.KERBEROS) {
- if (ticket != null && ticket.getRealUser() != null) {
- ticket = ticket.getRealUser();
- }
- }
- SaslClientHandler saslHandler;
- if (ticket == null) {
- throw new FatalConnectionException("ticket/user is null");
- }
- final UserGroupInformation realTicket = ticket;
- saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
- @Override
- public SaslClientHandler run() throws IOException {
- return getSaslHandler(realTicket, bootstrap);
- }
- });
- if (saslHandler != null) {
- // Sasl connect is successful. Let's set up Sasl channel handler
- channel.pipeline().addFirst(saslHandler);
- } else {
- // fall back to simple auth because server told us so.
- authMethod = AuthMethod.SIMPLE;
- useSasl = false;
- }
- } else {
- startHBaseConnection(f.channel());
- }
- }
- });
- }
-
- /**
- * Start HBase connection
- * @param ch channel to start connection on
- */
- private void startHBaseConnection(Channel ch) {
- ch.pipeline().addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
- ch.pipeline().addLast(new AsyncServerResponseHandler(this));
- try {
- writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- close(future.cause());
- return;
- }
- List<AsyncCall> callsToWrite;
- synchronized (pendingCalls) {
- connected = true;
- callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
- }
- for (AsyncCall call : callsToWrite) {
- writeRequest(call);
- }
- }
- });
- } catch (IOException e) {
- close(e);
- }
- }
-
- /**
- * Start HBase connection with sasl encryption
- * @param ch channel to start connection on
- */
- private void startConnectionWithEncryption(Channel ch) {
- // for rpc encryption, the order of ChannelInboundHandler should be:
- // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
- // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
- // SaslClientHandler will handler this
- ch.pipeline().addFirst("beforeUnwrapDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
- ch.pipeline().addLast("afterUnwrapDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
- ch.pipeline().addLast(new AsyncServerResponseHandler(this));
- List<AsyncCall> callsToWrite;
- synchronized (pendingCalls) {
- connected = true;
- callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
- }
- for (AsyncCall call : callsToWrite) {
- writeRequest(call);
- }
- }
-
- /**
- * Get SASL handler
- * @param bootstrap to reconnect to
- * @return new SASL handler
- * @throws java.io.IOException if handler failed to create
- */
- private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
- final Bootstrap bootstrap) throws IOException {
- return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
- client.fallbackAllowed,
- client.conf.get("hbase.rpc.protection",
- SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
- getChannelHeaderBytes(authMethod),
- new SaslClientHandler.SaslExceptionHandler() {
- @Override
- public void handle(int retryCount, Random random, Throwable cause) {
- try {
- // Handle Sasl failure. Try to potentially get new credentials
- handleSaslConnectionFailure(retryCount, cause, realTicket);
-
- // Try to reconnect
- client.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- connect(bootstrap);
- }
- }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
- } catch (IOException | InterruptedException e) {
- close(e);
- }
- }
- }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
- @Override
- public void onSuccess(Channel channel) {
- startHBaseConnection(channel);
- }
-
- @Override
- public void onSaslProtectionSucess(Channel channel) {
- startConnectionWithEncryption(channel);
- }
- });
- }
-
- /**
- * Retry to connect or close
- *
- * @param bootstrap to connect with
- * @param connectCounter amount of tries
- * @param e exception of fail
- */
- private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
- if (connectCounter < client.maxRetries) {
- client.newTimeout(new TimerTask() {
- @Override public void run(Timeout timeout) throws Exception {
- connect(bootstrap);
- }
- }, client.failureSleep, TimeUnit.MILLISECONDS);
- } else {
- client.failedServers.addToFailedServers(address);
- close(e);
- }
- }
-
- /**
- * Calls method on channel
- * @param method to call
- * @param controller to run call with
- * @param request to send
- * @param responsePrototype to construct response with
- */
- public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
- final PayloadCarryingRpcController controller, final Message request,
- final Message responsePrototype, MetricsConnection.CallStats callStats) {
- final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
- method, request, controller, responsePrototype, callStats);
- controller.notifyOnCancel(new RpcCallback<Object>() {
- @Override
- public void run(Object parameter) {
- // TODO: do not need to call AsyncCall.setFailed?
- synchronized (pendingCalls) {
- pendingCalls.remove(call.id);
- }
- }
- });
- // TODO: this should be handled by PayloadCarryingRpcController.
- if (controller.isCanceled()) {
- // To finish if the call was cancelled before we set the notification (race condition)
- call.cancel(true);
- return call;
- }
-
- synchronized (pendingCalls) {
- if (closed) {
- Promise<Message> promise = channel.eventLoop().newPromise();
- promise.setFailure(new ConnectException());
- return promise;
- }
- pendingCalls.put(call.id, call);
- // Add timeout for cleanup if none is present
- if (cleanupTimer == null && call.getRpcTimeout() > 0) {
- cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
- }
- if (!connected) {
- return call;
- }
- }
- writeRequest(call);
- return call;
- }
-
- AsyncCall removePendingCall(int id) {
- synchronized (pendingCalls) {
- return pendingCalls.remove(id);
- }
- }
-
- /**
- * Write the channel header
- * @param channel to write to
- * @return future of write
- * @throws java.io.IOException on failure to write
- */
- private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
- RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
- int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
-
- ByteBuf b = channel.alloc().directBuffer(totalSize);
-
- b.writeInt(header.getSerializedSize());
- b.writeBytes(header.toByteArray());
-
- return channel.writeAndFlush(b);
- }
-
- private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
- RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
- ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
- b.putInt(header.getSerializedSize());
- b.put(header.toByteArray());
- return b.array();
- }
-
- private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
- RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
- .setServiceName(serviceName);
-
- RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
- if (userInfoPB != null) {
- headerBuilder.setUserInfo(userInfoPB);
- }
-
- if (client.codec != null) {
- headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
- }
- if (client.compressor != null) {
- headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
- }
-
- headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
- return headerBuilder.build();
- }
-
- /**
- * Write request to channel
- * @param call to write
- */
- private void writeRequest(final AsyncCall call) {
- try {
- final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
- .newBuilder();
- requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
- .setRequestParam(call.param != null);
-
- if (Trace.isTracing()) {
- Span s = Trace.currentSpan();
- requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
- .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
- }
-
- ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
- if (cellBlock != null) {
- final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
- .newBuilder();
- cellBlockBuilder.setLength(cellBlock.limit());
- requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
- }
- // Only pass priority if there one. Let zero be same as no priority.
- if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
- requestHeaderBuilder.setPriority(call.controller.getPriority());
- }
- requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
- Integer.MAX_VALUE : (int)call.rpcTimeout);
- RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
-
- int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
- if (cellBlock != null) {
- totalSize += cellBlock.remaining();
- }
-
- ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
- try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
- call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
- }
-
- channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
- } catch (IOException e) {
- close(e);
- }
- }
-
- /**
- * Set up server authorization
- * @throws java.io.IOException if auth setup failed
- */
- private void setupAuthorization() throws IOException {
- SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
- this.useSasl = client.userProvider.isHBaseSecurityEnabled();
-
- this.token = null;
- if (useSasl && securityInfo != null) {
- AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
- if (tokenKind != null) {
- TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
- if (tokenSelector != null) {
- token = tokenSelector.selectToken(new Text(client.clusterId),
- ticket.getUGI().getTokens());
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("No token selector found for type " + tokenKind);
- }
- }
- String serverKey = securityInfo.getServerPrincipal();
- if (serverKey == null) {
- throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
- }
- this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
- address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
- if (LOG.isDebugEnabled()) {
- LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
- + serverPrincipal);
- }
- }
-
- if (!useSasl) {
- authMethod = AuthMethod.SIMPLE;
- } else if (token != null) {
- authMethod = AuthMethod.DIGEST;
- } else {
- authMethod = AuthMethod.KERBEROS;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
- }
- reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
- }
-
- /**
- * Build the user information
- * @param ugi User Group Information
- * @param authMethod Authorization method
- * @return UserInformation protobuf
- */
- private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
- if (ugi == null || authMethod == AuthMethod.DIGEST) {
- // Don't send user for token auth
- return null;
- }
- RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
- if (authMethod == AuthMethod.KERBEROS) {
- // Send effective user for Kerberos auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- } else if (authMethod == AuthMethod.SIMPLE) {
- // Send both effective user and real user for simple auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- if (ugi.getRealUser() != null) {
- userInfoPB.setRealUser(ugi.getRealUser().getUserName());
- }
- }
- return userInfoPB.build();
- }
-
- /**
- * Create connection preamble
- * @param byteBuf to write to
- * @param authMethod to write
- */
- private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
- byteBuf.writeBytes(HConstants.RPC_HEADER);
- byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
- byteBuf.writeByte(authMethod.code);
- }
-
- private void close0(Throwable e) {
- List<AsyncCall> toCleanup;
- synchronized (pendingCalls) {
- if (closed) {
- return;
- }
- closed = true;
- toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
- pendingCalls.clear();
- }
- IOException closeException = null;
- if (e != null) {
- if (e instanceof IOException) {
- closeException = (IOException) e;
- } else {
- closeException = new IOException(e);
- }
- }
- // log the info
- if (LOG.isDebugEnabled() && closeException != null) {
- LOG.debug(name + ": closing ipc connection to " + address, closeException);
- }
- if (cleanupTimer != null) {
- cleanupTimer.cancel();
- cleanupTimer = null;
- }
- for (AsyncCall call : toCleanup) {
- call.setFailed(closeException != null ? closeException
- : new ConnectionClosingException(
- "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
- }
- channel.disconnect().addListener(ChannelFutureListener.CLOSE);
- if (LOG.isDebugEnabled()) {
- LOG.debug(name + ": closed");
- }
- }
-
- /**
- * Close connection
- * @param e exception on close
- */
- public void close(final Throwable e) {
- client.removeConnection(this);
-
- // Move closing from the requesting thread to the channel thread
- if (channel.eventLoop().inEventLoop()) {
- close0(e);
- } else {
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- close0(e);
- }
- });
- }
- }
-
- /**
- * Clean up calls.
- */
- private void cleanupCalls() {
- List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
- long currentTime = EnvironmentEdgeManager.currentTime();
- long nextCleanupTaskDelay = -1L;
- synchronized (pendingCalls) {
- for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
- AsyncCall call = iter.next();
- long timeout = call.getRpcTimeout();
- if (timeout > 0) {
- if (currentTime - call.getStartTime() >= timeout) {
- iter.remove();
- toCleanup.add(call);
- } else {
- if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
- nextCleanupTaskDelay = timeout;
- }
- }
- }
- }
- if (nextCleanupTaskDelay > 0) {
- cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
- } else {
- cleanupTimer = null;
- }
- }
- for (AsyncCall call : toCleanup) {
- call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
- + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
- }
- }
-
- /**
- * Check if the connection is alive
- * @return true if alive
- */
- public boolean isAlive() {
- return channel.isOpen();
- }
-
- /**
- * Check if user should authenticate over Kerberos
- * @return true if should be authenticated over Kerberos
- * @throws java.io.IOException on failure of check
- */
- private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- UserGroupInformation realUser = currentUser.getRealUser();
- return authMethod == AuthMethod.KERBEROS && loginUser != null &&
- // Make sure user logged in using Kerberos either keytab or TGT
- loginUser.hasKerberosCredentials() &&
- // relogin only in case it is the login user (e.g. JT)
- // or superuser (like oozie).
- (loginUser.equals(currentUser) || loginUser.equals(realUser));
- }
-
- /**
- * If multiple clients with the same principal try to connect to the same server at the same time,
- * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
- * work around this, what is done is that the client backs off randomly and tries to initiate the
- * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
- * attempted.
- * <p>
- * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
- * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
- * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
- * underlying authentication implementation, so there is no retry from other high level (for eg,
- * HCM or HBaseAdmin).
- * </p>
- * @param currRetries retry count
- * @param ex exception describing fail
- * @param user which is trying to connect
- * @throws java.io.IOException if IO fail
- * @throws InterruptedException if thread is interrupted
- */
- private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
- final UserGroupInformation user) throws IOException, InterruptedException {
- user.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws IOException, InterruptedException {
- if (shouldAuthenticateOverKrb()) {
- if (currRetries < MAX_SASL_RETRIES) {
- LOG.debug("Exception encountered while connecting to the server : " + ex);
- // try re-login
- if (UserGroupInformation.isLoginKeytabBased()) {
- UserGroupInformation.getLoginUser().reloginFromKeytab();
- } else {
- UserGroupInformation.getLoginUser().reloginFromTicketCache();
- }
-
- // Should reconnect
- return null;
- } else {
- String msg = "Couldn't setup connection for "
- + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
- LOG.warn(msg);
- throw (IOException) new IOException(msg).initCause(ex);
- }
- } else {
- LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
- }
- if (ex instanceof RemoteException) {
- throw (RemoteException) ex;
- }
- if (ex instanceof SaslException) {
- String msg = "SASL authentication failed."
- + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
- LOG.fatal(msg, ex);
- throw new RuntimeException(msg, ex);
- }
- throw new IOException(ex);
- }
- });
- }
-
- public int getConnectionHashCode() {
- return ConnectionId.hashCode(ticket, serviceName, address);
- }
-
- @Override
- public int hashCode() {
- return getConnectionHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof AsyncRpcChannel) {
- AsyncRpcChannel channel = (AsyncRpcChannel) obj;
- return channel.hashCode() == obj.hashCode();
- }
- return false;
- }
-
- @Override
- public String toString() {
- return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
- }
-
- /**
- * Listens to call writes and fails if write failed
- */
- private static final class CallWriteListener implements ChannelFutureListener {
- private final AsyncRpcChannel rpcChannel;
- private final int id;
-
- public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
- this.rpcChannel = asyncRpcChannel;
- this.id = id;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- AsyncCall call = rpcChannel.removePendingCall(id);
- if (call != null) {
- if (future.cause() instanceof IOException) {
- call.setFailed((IOException) future.cause());
- } else {
- call.setFailed(new IOException(future.cause()));
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
deleted file mode 100644
index e12e298..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.JVM;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.PoolMap;
-import org.apache.hadoop.hbase.util.Threads;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
-/**
- * Netty client for the requests and responses
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class AsyncRpcClient extends AbstractRpcClient {
-
- private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
-
- public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
- public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
- public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
-
- private static final HashedWheelTimer WHEEL_TIMER =
- new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
- 100, TimeUnit.MILLISECONDS);
-
- private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
- new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //empty initializer
- }
- };
-
- protected final AtomicInteger callIdCnt = new AtomicInteger();
-
- private final PoolMap<Integer, AsyncRpcChannel> connections;
-
- final FailedServers failedServers;
-
- @VisibleForTesting
- final Bootstrap bootstrap;
-
- private final boolean useGlobalEventLoopGroup;
-
- @VisibleForTesting
- static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
-
- private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
- getGlobalEventLoopGroup(Configuration conf) {
- if (GLOBAL_EVENT_LOOP_GROUP == null) {
- GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create global event loop group "
- + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
- }
- }
- return GLOBAL_EVENT_LOOP_GROUP;
- }
-
- private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
- Configuration conf) {
- // Max amount of threads to use. 0 lets Netty decide based on amount of cores
- int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
-
- // Config to enable native transport. Does not seem to be stable at time of implementation
- // although it is not extensively tested.
- boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
-
- // Use the faster native epoll transport mechanism on linux if enabled
- if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
- }
- return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
- Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
- }
- return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
- Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
- }
- }
-
- /**
- * Constructor for tests
- *
- * @param configuration to HBase
- * @param clusterId for the cluster
- * @param localAddress local address to connect to
- * @param metrics the connection metrics
- * @param channelInitializer for custom channel handlers
- */
- protected AsyncRpcClient(Configuration configuration, String clusterId,
- SocketAddress localAddress, MetricsConnection metrics,
- ChannelInitializer<SocketChannel> channelInitializer) {
- super(configuration, clusterId, localAddress, metrics);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting async Hbase RPC client");
- }
-
- Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
- this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
- if (useGlobalEventLoopGroup) {
- eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
- } else {
- eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
- + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
- }
-
- this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
- this.failedServers = new FailedServers(configuration);
-
- int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-
- // Configure the default bootstrap.
- this.bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
- .channel(eventLoopGroupAndChannelClass.getSecond())
- .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
- .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
- if (channelInitializer == null) {
- channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
- }
- bootstrap.handler(channelInitializer);
- if (localAddress != null) {
- bootstrap.localAddress(localAddress);
- }
- }
-
- /** Used in test only. */
- AsyncRpcClient(Configuration configuration) {
- this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
- }
-
- /** Used in test only. */
- AsyncRpcClient(Configuration configuration,
- ChannelInitializer<SocketChannel> channelInitializer) {
- this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
- }
-
- /**
- * Constructor
- *
- * @param configuration to HBase
- * @param clusterId for the cluster
- * @param localAddress local address to connect to
- * @param metrics the connection metrics
- */
- public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
- MetricsConnection metrics) {
- this(configuration, clusterId, localAddress, metrics, null);
- }
-
- /**
- * Make a call, passing <code>param</code>, to the IPC server running at
- * <code>address</code> which is servicing the <code>protocol</code> protocol,
- * with the <code>ticket</code> credentials, returning the value.
- * Throws exceptions if there are network problems or if the remote code
- * threw an exception.
- *
- * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
- * {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new
- * instance of User each time so will be a new Connection each time.
- * @return A pair with the Message response and the Cell data (if any).
- * @throws InterruptedException if call is interrupted
- * @throws java.io.IOException if a connection failure is encountered
- */
- @Override
- protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
- Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
- InetSocketAddress addr, MetricsConnection.CallStats callStats)
- throws IOException, InterruptedException {
- if (pcrc == null) {
- pcrc = new PayloadCarryingRpcController();
- }
- final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
-
- Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
- long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
- try {
- Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
- return new Pair<>(response, pcrc.cellScanner());
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- } else {
- throw wrapException(addr, (Exception) e.getCause());
- }
- } catch (TimeoutException e) {
- CallTimeoutException cte = new CallTimeoutException(promise.toString());
- throw wrapException(addr, cte);
- }
- }
-
- /**
- * Call method async
- */
- private void callMethod(final Descriptors.MethodDescriptor md,
- final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket,
- InetSocketAddress addr, final RpcCallback<Message> done) {
- final AsyncRpcChannel connection;
- try {
- connection = createRpcChannel(md.getService().getName(), addr, ticket);
- final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
- GenericFutureListener<Future<Message>> listener =
- new GenericFutureListener<Future<Message>>() {
- @Override
- public void operationComplete(Future<Message> future) throws Exception {
- cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
- if (metrics != null) {
- metrics.updateRpc(md, param, cs);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
- }
- if (!future.isSuccess()) {
- Throwable cause = future.cause();
- if (cause instanceof IOException) {
- pcrc.setFailed((IOException) cause);
- } else {
- pcrc.setFailed(new IOException(cause));
- }
- } else {
- try {
- done.run(future.get());
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- pcrc.setFailed((IOException) cause);
- } else {
- pcrc.setFailed(new IOException(cause));
- }
- } catch (InterruptedException e) {
- pcrc.setFailed(new IOException(e));
- }
- }
- }
- };
- cs.setStartTime(EnvironmentEdgeManager.currentTime());
- connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
- } catch (StoppedRpcClientException|FailedServerException e) {
- pcrc.setFailed(e);
- }
- }
-
- private boolean closed = false;
-
- /**
- * Close netty
- */
- public void close() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping async HBase RPC client");
- }
-
- synchronized (connections) {
- if (closed) {
- return;
- }
- closed = true;
- for (AsyncRpcChannel conn : connections.values()) {
- conn.close(null);
- }
- }
- // do not close global EventLoopGroup.
- if (!useGlobalEventLoopGroup) {
- bootstrap.group().shutdownGracefully();
- }
- }
-
- /**
- * Create a cell scanner
- *
- * @param cellBlock to create scanner for
- * @return CellScanner
- * @throws java.io.IOException on error on creation cell scanner
- */
- public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
- return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
- }
-
- /**
- * Build cell block
- *
- * @param cells to create block with
- * @return ByteBuffer with cells
- * @throws java.io.IOException if block creation fails
- */
- public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
- return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
- }
-
- /**
- * Creates an RPC client
- *
- * @param serviceName name of servicce
- * @param location to connect to
- * @param ticket for current user
- * @return new RpcChannel
- * @throws StoppedRpcClientException when Rpc client is stopped
- * @throws FailedServerException if server failed
- */
- private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
- User ticket) throws StoppedRpcClientException, FailedServerException {
- // Check if server is failed
- if (this.failedServers.isFailedServer(location)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not trying to connect to " + location +
- " this server is in the failed servers list");
- }
- throw new FailedServerException(
- "This server is in the failed servers list: " + location);
- }
-
- int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
-
- AsyncRpcChannel rpcChannel;
- synchronized (connections) {
- if (closed) {
- throw new StoppedRpcClientException();
- }
- rpcChannel = connections.get(hashCode);
- if (rpcChannel == null || !rpcChannel.isAlive()) {
- rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
- connections.put(hashCode, rpcChannel);
- }
- }
-
- return rpcChannel;
- }
-
- /**
- * Interrupt the connections to the given ip:port server. This should be called if the server
- * is known as actually dead. This will not prevent current operation to be retried, and,
- * depending on their own behavior, they may retry on the same server. This can be a feature,
- * for example at startup. In any case, they're likely to get connection refused (if the
- * process died) or no route to host: i.e. there next retries should be faster and with a
- * safe exception.
- *
- * @param sn server to cancel connections for
- */
- @Override
- public void cancelConnections(ServerName sn) {
- synchronized (connections) {
- for (AsyncRpcChannel rpcChannel : connections.values()) {
- if (rpcChannel.isAlive() &&
- rpcChannel.address.getPort() == sn.getPort() &&
- rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
- LOG.info("The server on " + sn.toString() +
- " is dead - stopping the connection " + rpcChannel.toString());
- rpcChannel.close(null);
- }
- }
- }
- }
-
- /**
- * Remove connection from pool
- */
- public void removeConnection(AsyncRpcChannel connection) {
- int connectionHashCode = connection.hashCode();
- synchronized (connections) {
- // we use address as cache key, so we should check here to prevent removing the
- // wrong connection
- AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
- if (connectionInPool != null && connectionInPool.equals(connection)) {
- this.connections.remove(connectionHashCode);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
- connection.toString(), System.identityHashCode(connection),
- System.identityHashCode(connectionInPool)));
- }
- }
- }
-
- /**
- * Creates a "channel" that can be used by a protobuf service. Useful setting up
- * protobuf stubs.
- *
- * @param sn server name describing location of server
- * @param user which is to use the connection
- * @param rpcTimeout default rpc operation timeout
- *
- * @return A rpc channel that goes via this rpc client instance.
- */
- public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
- return new RpcChannelImplementation(this, sn, user, rpcTimeout);
- }
-
- /**
- * Blocking rpc channel that goes via hbase rpc.
- */
- @VisibleForTesting
- public static class RpcChannelImplementation implements RpcChannel {
- private final InetSocketAddress isa;
- private final AsyncRpcClient rpcClient;
- private final User ticket;
- private final int channelOperationTimeout;
-
- /**
- * @param channelOperationTimeout - the default timeout when no timeout is given
- */
- protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
- final ServerName sn, final User ticket, int channelOperationTimeout) {
- this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
- this.rpcClient = rpcClient;
- this.ticket = ticket;
- this.channelOperationTimeout = channelOperationTimeout;
- }
-
- @Override
- public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
- Message param, Message returnType, RpcCallback<Message> done) {
- PayloadCarryingRpcController pcrc;
- if (controller != null) {
- pcrc = (PayloadCarryingRpcController) controller;
- if (!pcrc.hasCallTimeout()) {
- pcrc.setCallTimeout(channelOperationTimeout);
- }
- } else {
- pcrc = new PayloadCarryingRpcController();
- pcrc.setCallTimeout(channelOperationTimeout);
- }
-
- this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
- }
- }
-
- Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
- return WHEEL_TIMER.newTimeout(task, delay, unit);
- }
-}