You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/09/08 12:33:21 UTC
[5/6] hbase git commit: HBASE-16445 Refactor and reimplement RpcClient
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
deleted file mode 100644
index 5f4d2f4..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.JVM;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.PoolMap;
-import org.apache.hadoop.hbase.util.Threads;
-
-/**
- * Netty client for the requests and responses
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class AsyncRpcClient extends AbstractRpcClient {
-
- private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
-
- public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
- public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
- public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
-
- private static final HashedWheelTimer WHEEL_TIMER =
- new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
- 100, TimeUnit.MILLISECONDS);
-
- private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
- new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- //empty initializer
- }
- };
-
- protected final AtomicInteger callIdCnt = new AtomicInteger();
-
- private final PoolMap<Integer, AsyncRpcChannel> connections;
-
- final FailedServers failedServers;
-
- @VisibleForTesting
- final Bootstrap bootstrap;
-
- private final boolean useGlobalEventLoopGroup;
-
- @VisibleForTesting
- static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
-
- synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
- getGlobalEventLoopGroup(Configuration conf) {
- if (GLOBAL_EVENT_LOOP_GROUP == null) {
- GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create global event loop group "
- + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
- }
- }
- return GLOBAL_EVENT_LOOP_GROUP;
- }
-
- private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
- Configuration conf) {
- // Max amount of threads to use. 0 lets Netty decide based on amount of cores
- int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
-
- // Config to enable native transport. Does not seem to be stable at time of implementation
- // although it is not extensively tested.
- boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
-
- // Use the faster native epoll transport mechanism on linux if enabled
- if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
- }
- return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
- Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
- }
- return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
- Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
- }
- }
-
- /**
- * Constructor for tests
- *
- * @param configuration to HBase
- * @param clusterId for the cluster
- * @param localAddress local address to connect to
- * @param metrics the connection metrics
- * @param channelInitializer for custom channel handlers
- */
- protected AsyncRpcClient(Configuration configuration, String clusterId,
- SocketAddress localAddress, MetricsConnection metrics,
- ChannelInitializer<SocketChannel> channelInitializer) {
- super(configuration, clusterId, localAddress, metrics);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting async Hbase RPC client");
- }
-
- Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
- this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
- if (useGlobalEventLoopGroup) {
- eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
- } else {
- eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
- + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
- }
-
- this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
- this.failedServers = new FailedServers(configuration);
-
- int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-
- // Configure the default bootstrap.
- this.bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
- .channel(eventLoopGroupAndChannelClass.getSecond())
- .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
- .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
- if (channelInitializer == null) {
- channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
- }
- bootstrap.handler(channelInitializer);
- if (localAddress != null) {
- bootstrap.localAddress(localAddress);
- }
- }
-
- /** Used in test only. */
- AsyncRpcClient(Configuration configuration) {
- this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
- }
-
- /** Used in test only. */
- AsyncRpcClient(Configuration configuration,
- ChannelInitializer<SocketChannel> channelInitializer) {
- this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
- }
-
- /**
- * Constructor
- *
- * @param configuration to HBase
- * @param clusterId for the cluster
- * @param localAddress local address to connect to
- * @param metrics the connection metrics
- */
- public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
- MetricsConnection metrics) {
- this(configuration, clusterId, localAddress, metrics, null);
- }
-
- /**
- * Make a call, passing <code>param</code>, to the IPC server running at
- * <code>address</code> which is servicing the <code>protocol</code> protocol,
- * with the <code>ticket</code> credentials, returning the value.
- * Throws exceptions if there are network problems or if the remote code
- * threw an exception.
- *
- * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
- * {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new
- * instance of User each time so will be a new Connection each time.
- * @return A pair with the Message response and the Cell data (if any).
- * @throws InterruptedException if call is interrupted
- * @throws java.io.IOException if a connection failure is encountered
- */
- @Override
- protected Pair<Message, CellScanner> call(HBaseRpcController pcrc,
- Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
- InetSocketAddress addr, MetricsConnection.CallStats callStats)
- throws IOException, InterruptedException {
- if (pcrc == null) {
- pcrc = new HBaseRpcControllerImpl();
- }
- final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
-
- final Promise<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
- getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(),
- pcrc.getPriority());
-
- pcrc.notifyOnCancel(new RpcCallback<Object>() {
- @Override
- public void run(Object parameter) {
- // Will automatically fail the promise with CancellationException
- promise.cancel(true);
- }
- });
-
- long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
- try {
- Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
- return new Pair<>(response, pcrc.cellScanner());
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- } else {
- throw IPCUtil.wrapException(addr, (Exception) e.getCause());
- }
- } catch (TimeoutException e) {
- CallTimeoutException cte = new CallTimeoutException(promise.toString());
- throw IPCUtil.wrapException(addr, cte);
- }
- }
-
- private MessageConverter<Message, Message> getMessageConverterWithRpcController(
- final HBaseRpcController pcrc) {
- return new
- MessageConverter<Message, Message>() {
- @Override
- public Message convert(Message msg, CellScanner cellScanner) {
- pcrc.setCellScanner(cellScanner);
- return msg;
- }
- };
- }
-
- /**
- * Call method async
- */
- private void callMethod(final Descriptors.MethodDescriptor md,
- final HBaseRpcController pcrc, final Message param, Message returnType, User ticket,
- InetSocketAddress addr, final RpcCallback<Message> done) {
- final AsyncRpcChannel connection;
- try {
- connection = createRpcChannel(md.getService().getName(), addr, ticket);
-
- FutureListener<Message> listener =
- new FutureListener<Message>() {
- @Override
- public void operationComplete(Future<Message> future) throws Exception {
- if (!future.isSuccess()) {
- Throwable cause = future.cause();
- if (cause instanceof IOException) {
- pcrc.setFailed((IOException) cause);
- } else {
- pcrc.setFailed(new IOException(cause));
- }
- } else {
- try {
- done.run(future.get());
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- pcrc.setFailed((IOException) cause);
- } else {
- pcrc.setFailed(new IOException(cause));
- }
- } catch (InterruptedException e) {
- pcrc.setFailed(new IOException(e));
- }
- }
- }
- };
- connection.callMethod(md, param, pcrc.cellScanner(), returnType,
- getMessageConverterWithRpcController(pcrc), null,
- pcrc.getCallTimeout(), pcrc.getPriority())
- .addListener(listener);
- } catch (StoppedRpcClientException|FailedServerException e) {
- pcrc.setFailed(e);
- }
- }
-
- private boolean closed = false;
-
- /**
- * Close netty
- */
- public void close() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping async HBase RPC client");
- }
-
- synchronized (connections) {
- if (closed) {
- return;
- }
- closed = true;
- for (AsyncRpcChannel conn : connections.values()) {
- conn.close(null);
- }
- }
- // do not close global EventLoopGroup.
- if (!useGlobalEventLoopGroup) {
- bootstrap.config().group().shutdownGracefully();
- }
- }
-
- /**
- * Create a cell scanner
- *
- * @param cellBlock to create scanner for
- * @return CellScanner
- * @throws java.io.IOException on error on creation cell scanner
- */
- public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
- return cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
- }
-
- /**
- * Build cell block
- *
- * @param cells to create block with
- * @return ByteBuffer with cells
- * @throws java.io.IOException if block creation fails
- */
- public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
- return cellBlockBuilder.buildCellBlock(this.codec, this.compressor, cells);
- }
-
- /**
- * Creates an RPC client
- *
- * @param serviceName name of service
- * @param location to connect to
- * @param ticket for current user
- * @return new RpcChannel
- * @throws StoppedRpcClientException when Rpc client is stopped
- * @throws FailedServerException if server failed
- */
- private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
- User ticket) throws StoppedRpcClientException, FailedServerException {
- // Check if server is failed
- if (this.failedServers.isFailedServer(location)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not trying to connect to " + location +
- " this server is in the failed servers list");
- }
- throw new FailedServerException(
- "This server is in the failed servers list: " + location);
- }
-
- int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
-
- AsyncRpcChannel rpcChannel;
- synchronized (connections) {
- if (closed) {
- throw new StoppedRpcClientException();
- }
- rpcChannel = connections.get(hashCode);
- if (rpcChannel != null && !rpcChannel.isAlive()) {
- LOG.debug("Removing dead channel from server="+rpcChannel.getAddress().toString());
- connections.remove(hashCode);
- }
- if (rpcChannel == null) {
- rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
- connections.put(hashCode, rpcChannel);
- }
- }
-
- return rpcChannel;
- }
-
- /**
- * Interrupt the connections to the given ip:port server. This should be called if the server
- * is known as actually dead. This will not prevent current operation to be retried, and,
- * depending on their own behavior, they may retry on the same server. This can be a feature,
- * for example at startup. In any case, they're likely to get connection refused (if the
- * process died) or no route to host: i.e. there next retries should be faster and with a
- * safe exception.
- *
- * @param sn server to cancel connections for
- */
- @Override
- public void cancelConnections(ServerName sn) {
- synchronized (connections) {
- for (AsyncRpcChannel rpcChannel : connections.values()) {
- if (rpcChannel.isAlive() &&
- rpcChannel.getAddress().getPort() == sn.getPort() &&
- rpcChannel.getAddress().getHostName().contentEquals(sn.getHostname())) {
- LOG.info("The server on " + sn.toString() +
- " is dead - stopping the connection " + rpcChannel.toString());
- rpcChannel.close(null);
- }
- }
- }
- }
-
- /**
- * Remove connection from pool
- * @param connection to remove
- */
- public void removeConnection(AsyncRpcChannel connection) {
- int connectionHashCode = connection.hashCode();
- synchronized (connections) {
- // we use address as cache key, so we should check here to prevent removing the
- // wrong connection
- AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
- if (connectionInPool != null && connectionInPool.equals(connection)) {
- this.connections.remove(connectionHashCode);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
- connection.toString(), System.identityHashCode(connection),
- System.identityHashCode(connectionInPool)));
- }
- }
- }
-
- @Override
- public RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
- return new RpcChannelImplementation(this, sn, user, rpcTimeout);
- }
-
- /**
- * Blocking rpc channel that goes via hbase rpc.
- */
- @VisibleForTesting
- public static class RpcChannelImplementation implements RpcChannel {
- private final InetSocketAddress isa;
- private final AsyncRpcClient rpcClient;
- private final User ticket;
- private final int channelOperationTimeout;
-
- /**
- * @param channelOperationTimeout - the default timeout when no timeout is given
- */
- protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
- final ServerName sn, final User ticket, int channelOperationTimeout) {
- this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
- this.rpcClient = rpcClient;
- this.ticket = ticket;
- this.channelOperationTimeout = channelOperationTimeout;
- }
-
- @Override
- public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
- Message param, Message returnType, RpcCallback<Message> done) {
- HBaseRpcController pcrc =
- configurePayloadCarryingRpcController(controller, channelOperationTimeout);
-
- this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
- }
- }
-
- /**
- * Get a new timeout on this RPC client
- * @param task to run at timeout
- * @param delay for the timeout
- * @param unit time unit for the timeout
- * @return Timeout
- */
- Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
- return WHEEL_TIMER.newTimeout(task, delay, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
deleted file mode 100644
index 7a2802f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Message;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.ipc.RemoteException;
-
-/**
- * Handles Hbase responses
- */
-@InterfaceAudience.Private
-public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
- private final AsyncRpcChannel channel;
-
- /**
- * Constructor
- * @param channel on which this response handler operates
- */
- public AsyncServerResponseHandler(AsyncRpcChannel channel) {
- this.channel = channel;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) throws Exception {
- ByteBufInputStream in = new ByteBufInputStream(inBuffer);
- int totalSize = inBuffer.readableBytes();
- // Read the header
- RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
- int id = responseHeader.getCallId();
- AsyncCall call = channel.removePendingCall(id);
- if (call == null) {
- // So we got a response for which we have no corresponding 'call' here on the client-side.
- // We probably timed out waiting, cleaned up all references, and now the server decides
- // to return a response. There is nothing we can do w/ the response at this stage. Clean
- // out the wire of the response so its out of the way and we can get other responses on
- // this connection.
- int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
- int whatIsLeftToRead = totalSize - readSoFar;
-
- // This is done through a Netty ByteBuf which has different behavior than InputStream.
- // It does not return number of bytes read but will update pointer internally and throws an
- // exception when too many bytes are to be skipped.
- inBuffer.skipBytes(whatIsLeftToRead);
- return;
- }
-
- if (responseHeader.hasException()) {
- RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
- RemoteException re = createRemoteException(exceptionResponse);
- if (exceptionResponse.getExceptionClassName()
- .equals(FatalConnectionException.class.getName())) {
- channel.close(re);
- } else {
- call.setFailed(re);
- }
- } else {
- Message value = null;
- // Call may be null because it may have timedout and been cleaned up on this side already
- if (call.responseDefaultType != null) {
- Message.Builder builder = call.responseDefaultType.newBuilderForType();
- ProtobufUtil.mergeDelimitedFrom(builder, in);
- value = builder.build();
- }
- CellScanner cellBlockScanner = null;
- if (responseHeader.hasCellBlockMeta()) {
- int size = responseHeader.getCellBlockMeta().getLength();
- byte[] cellBlock = new byte[size];
- inBuffer.readBytes(cellBlock, 0, cellBlock.length);
- cellBlockScanner = channel.client.createCellScanner(cellBlock);
- }
- call.setSuccess(value, cellBlockScanner);
- call.callStats.setResponseSizeBytes(totalSize);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- channel.close(cause);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- channel.close(new IOException("connection closed"));
- }
-
- /**
- * @param e Proto exception
- * @return RemoteException made from passed <code>e</code>
- */
- private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
- String innerExceptionClassName = e.getExceptionClassName();
- boolean doNotRetry = e.getDoNotRetry();
- return e.hasHostname() ?
- // If a hostname then add it to the RemoteWithExtrasException
- new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
- e.getPort(), doNotRetry)
- : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
index 0475e58..523ca55 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
@@ -46,7 +46,7 @@ public class BlockingRpcCallback<R> implements RpcCallback<R> {
synchronized (this) {
result = parameter;
resultSet = true;
- this.notify();
+ this.notifyAll();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
new file mode 100644
index 0000000..d27602e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * Does RPC against a cluster. Manages connections per regionserver in the cluster.
+ * <p>
+ * See HBaseServer
+ */
+@InterfaceAudience.Private
+public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection> {
+
+ protected final SocketFactory socketFactory; // how to create sockets
+
+ /**
+ * Used in test only. Construct an IPC client for the cluster {@code clusterId} with the default
+ * SocketFactory
+ */
+ @VisibleForTesting
+ BlockingRpcClient(Configuration conf) {
+ this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null);
+ }
+
+ /**
+ * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory This
+ * method is called with reflection by the RpcClientFactory to create an instance
+ * @param conf configuration
+ * @param clusterId the cluster id
+ * @param localAddr client socket bind address.
+ * @param metrics the connection metrics
+ */
+ public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+ MetricsConnection metrics) {
+ super(conf, clusterId, localAddr, metrics);
+ this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ }
+
+ /**
+ * Creates a connection. Can be overridden by a subclass for testing.
+ * @param remoteId - the ConnectionId to use for the connection creation.
+ */
+ protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException {
+ return new BlockingRpcConnection(this, remoteId);
+ }
+
+ @Override
+ protected void closeInternal() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
new file mode 100644
index 0000000..c8b366d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -0,0 +1,725 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.RpcCallback;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
+ * remote address. Calls are multiplexed through this socket: responses may be delivered out of
+ * order.
+ */
+@InterfaceAudience.Private
+class BlockingRpcConnection extends RpcConnection implements Runnable {
+
+ private static final Log LOG = LogFactory.getLog(BlockingRpcConnection.class);
+
+ private final BlockingRpcClient rpcClient;
+
+ private final String threadName;
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "We are always under lock actually")
+ private Thread thread;
+
+ // connected socket. protected for writing UT.
+ protected Socket socket = null;
+ private DataInputStream in;
+ private DataOutputStream out;
+
+ private HBaseSaslRpcClient saslRpcClient;
+
+ // currently active calls
+ private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
+
+ private final CallSender callSender;
+
+ private boolean closed = false;
+
+ private byte[] connectionHeaderPreamble;
+
+ private byte[] connectionHeaderWithLength;
+
+ /**
+ * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
+ * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
+ * use a different thread for writing. This way, on interruptions, we either cancel the writes or
+ * ignore the answer if the write is already done, but we don't stop the write in the middle. This
+ * adds a thread per region server in the client, so it's kept as an option.
+ * <p>
+ * The implementation is simple: the client threads adds their call to the queue, and then wait
+ * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
+ * interruption, the client cancels its call. The CallSender checks that the call has not been
+ * canceled before writing it.
+ * </p>
+ * When the connection closes, all the calls not yet sent are dismissed. The client thread is
+ * notified with an appropriate exception, as if the call was already sent but the answer not yet
+ * received.
+ * </p>
+ */
+ private class CallSender extends Thread {
+
+ private final Queue<Call> callsToWrite;
+
+ private final int maxQueueSize;
+
+ public CallSender(String name, Configuration conf) {
+ int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
+ callsToWrite = new ArrayDeque<>(queueSize);
+ this.maxQueueSize = queueSize;
+ setDaemon(true);
+ setName(name + " - writer");
+ }
+
+ public void sendCall(final Call call) throws IOException {
+ if (callsToWrite.size() >= maxQueueSize) {
+ throw new IOException("Can't add the call " + call.id
+ + " to the write queue. callsToWrite.size()=" + callsToWrite.size());
+ }
+ callsToWrite.offer(call);
+ BlockingRpcConnection.this.notifyAll();
+ }
+
+ public void remove(Call call) {
+ callsToWrite.remove();
+ // By removing the call from the expected call list, we make the list smaller, but
+ // it means as well that we don't know how many calls we cancelled.
+ calls.remove(call.id);
+ call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime="
+ + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
+ + call.timeout));
+ }
+
+ /**
+ * Reads the call from the queue, write them on the socket.
+ */
+ @Override
+ public void run() {
+ synchronized (BlockingRpcConnection.this) {
+ while (!closed) {
+ if (callsToWrite.isEmpty()) {
+ // We should use another monitor object here for better performance since the read
+ // thread also uses ConnectionImpl.this. But this makes the locking schema more
+ // complicated, can do it later as an optimization.
+ try {
+ BlockingRpcConnection.this.wait();
+ } catch (InterruptedException e) {
+ }
+ // check if we need to quit, so continue the main loop instead of fallback.
+ continue;
+ }
+ Call call = callsToWrite.poll();
+ if (call.isDone()) {
+ continue;
+ }
+ try {
+ tracedWriteRequest(call);
+ } catch (IOException e) {
+ // exception here means the call has not been added to the pendingCalls yet, so we need
+ // to fail it by our own.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("call write error for call #" + call.id, e);
+ }
+ call.setException(e);
+ closeConn(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Cleans the call not yet sent when we finish.
+ */
+ public void cleanup(IOException e) {
+ IOException ie = new ConnectionClosingException(
+ "Connection to " + remoteId.address + " is closing.");
+ for (Call call : callsToWrite) {
+ call.setException(ie);
+ }
+ callsToWrite.clear();
+ }
+ }
+
+ BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
+ super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
+ rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
+ this.rpcClient = rpcClient;
+ if (remoteId.getAddress().isUnresolved()) {
+ throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
+ }
+
+ this.connectionHeaderPreamble = getConnectionHeaderPreamble();
+ ConnectionHeader header = getConnectionHeader();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeInt(header.getSerializedSize());
+ header.writeTo(dos);
+ assert baos.size() == 4 + header.getSerializedSize();
+ this.connectionHeaderWithLength = baos.getBuffer();
+
+ UserGroupInformation ticket = remoteId.ticket.getUGI();
+ this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() + ") connection to "
+ + remoteId.getAddress().toString()
+ + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
+
+ if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
+ callSender = new CallSender(threadName, this.rpcClient.conf);
+ callSender.start();
+ } else {
+ callSender = null;
+ }
+ }
+
+ // protected for write UT.
+ protected void setupConnection() throws IOException {
+ short ioFailures = 0;
+ short timeoutFailures = 0;
+ while (true) {
+ try {
+ this.socket = this.rpcClient.socketFactory.createSocket();
+ this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
+ this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
+ if (this.rpcClient.localAddr != null) {
+ this.socket.bind(this.rpcClient.localAddr);
+ }
+ NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO);
+ this.socket.setSoTimeout(this.rpcClient.readTO);
+ return;
+ } catch (SocketTimeoutException toe) {
+ /*
+ * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
+ */
+ handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
+ } catch (IOException ie) {
+ handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
+ }
+ }
+ }
+
+ /**
+ * Handle connection failures If the current number of retries is equal to the max number of
+ * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
+ * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
+ * the sleep is synchronized; the locks will be retained.
+ * @param curRetries current number of retries
+ * @param maxRetries max number of retries allowed
+ * @param ioe failure reason
+ * @throws IOException if max number of retries is reached
+ */
+ private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
+ throws IOException {
+ closeSocket();
+
+ // throw the exception if the maximum number of retries is reached
+ if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
+ throw ioe;
+ }
+
+ // otherwise back off and retry
+ try {
+ Thread.sleep(this.rpcClient.failureSleep);
+ } catch (InterruptedException ie) {
+ ExceptionUtil.rethrowIfInterrupt(ie);
+ }
+
+ LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping "
+ + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s).");
+ }
+
+ /*
+ * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
+ * as to be closed, or the client is marked as not running.
+ * @return true if it is time to read a response; false otherwise.
+ */
+ private synchronized boolean waitForWork() {
+ // beware of the concurrent access to the calls list: we can add calls, but as well
+ // remove them.
+ long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
+ for (;;) {
+ if (thread == null) {
+ return false;
+ }
+ if (!calls.isEmpty()) {
+ return true;
+ }
+ if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
+ closeConn(
+ new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
+ return false;
+ }
+ try {
+ wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size());
+ }
+ while (waitForWork()) {
+ readResponse();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size());
+ }
+ }
+
+ private void disposeSasl() {
+ if (saslRpcClient != null) {
+ saslRpcClient.dispose();
+ saslRpcClient = null;
+ }
+ }
+
+ private boolean setupSaslConnection(final InputStream in2, final OutputStream out2)
+ throws IOException {
+ saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
+ this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection",
+ QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
+ return saslRpcClient.saslConnect(in2, out2);
+ }
+
+ /**
+ * If multiple clients with the same principal try to connect to the same server at the same time,
+ * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
+ * work around this, what is done is that the client backs off randomly and tries to initiate the
+ * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
+ * attempted.
+ * <p>
+ * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
+ * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
+ * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
+ * underlying authentication implementation, so there is no retry from other high level (for eg,
+ * HCM or HBaseAdmin).
+ * </p>
+ */
+ private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
+ final Exception ex, final UserGroupInformation user)
+ throws IOException, InterruptedException {
+ closeSocket();
+ user.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws IOException, InterruptedException {
+ if (shouldAuthenticateOverKrb()) {
+ if (currRetries < maxRetries) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception encountered while connecting to " + "the server : " + ex);
+ }
+ // try re-login
+ relogin();
+ disposeSasl();
+ // have granularity of milliseconds
+ // we are sleeping with the Connection lock held but since this
+ // connection instance is being used for connecting to the server
+ // in question, it is okay
+ Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
+ return null;
+ } else {
+ String msg = "Couldn't setup connection for "
+ + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
+ LOG.warn(msg, ex);
+ throw (IOException) new IOException(msg).initCause(ex);
+ }
+ } else {
+ LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
+ }
+ if (ex instanceof RemoteException) {
+ throw (RemoteException) ex;
+ }
+ if (ex instanceof SaslException) {
+ String msg = "SASL authentication failed."
+ + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
+ LOG.fatal(msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ throw new IOException(ex);
+ }
+ });
+ }
+
+ private void setupIOstreams() throws IOException {
+ if (socket != null) {
+ // The connection is already available. Perfect.
+ return;
+ }
+
+ if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not trying to connect to " + remoteId.address
+ + " this server is in the failed servers list");
+ }
+ throw new FailedServerException(
+ "This server is in the failed servers list: " + remoteId.address);
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to " + remoteId.address);
+ }
+
+ short numRetries = 0;
+ final short MAX_RETRIES = 5;
+ while (true) {
+ setupConnection();
+ InputStream inStream = NetUtils.getInputStream(socket);
+ // This creates a socket with a write timeout. This timeout cannot be changed.
+ OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
+ // Write out the preamble -- MAGIC, version, and auth to use.
+ writeConnectionHeaderPreamble(outStream);
+ if (useSasl) {
+ final InputStream in2 = inStream;
+ final OutputStream out2 = outStream;
+ UserGroupInformation ticket = getUGI();
+ boolean continueSasl;
+ if (ticket == null) {
+ throw new FatalConnectionException("ticket/user is null");
+ }
+ try {
+ continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return setupSaslConnection(in2, out2);
+ }
+ });
+ } catch (Exception ex) {
+ ExceptionUtil.rethrowIfInterrupt(ex);
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket);
+ continue;
+ }
+ if (continueSasl) {
+ // Sasl connect is successful. Let's set up Sasl i/o streams.
+ inStream = saslRpcClient.getInputStream(inStream);
+ outStream = saslRpcClient.getOutputStream(outStream);
+ } else {
+ // fall back to simple auth because server told us so.
+ // do not change authMethod and useSasl here, we should start from secure when
+ // reconnecting because regionserver may change its sasl config after restart.
+ }
+ }
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
+ this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ // Now write out the connection header
+ writeConnectionHeader();
+ break;
+ }
+ } catch (Throwable t) {
+ closeSocket();
+ IOException e = ExceptionUtil.asInterrupt(t);
+ if (e == null) {
+ this.rpcClient.failedServers.addToFailedServers(remoteId.address);
+ if (t instanceof LinkageError) {
+ // probably the hbase hadoop version does not match the running hadoop version
+ e = new DoNotRetryIOException(t);
+ } else if (t instanceof IOException) {
+ e = (IOException) t;
+ } else {
+ e = new IOException("Could not set up IO Streams to " + remoteId.address, t);
+ }
+ }
+ throw e;
+ }
+
+ // start the receiver thread after the socket connection has been set up
+ thread = new Thread(this, threadName);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
+ */
+ private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
+ out.write(connectionHeaderPreamble);
+ out.flush();
+ }
+
+ /**
+ * Write the connection header.
+ */
+ private void writeConnectionHeader() throws IOException {
+ this.out.write(connectionHeaderWithLength);
+ this.out.flush();
+ }
+
+ private void tracedWriteRequest(Call call) throws IOException {
+ try (TraceScope ignored = Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) {
+ writeRequest(call);
+ }
+ }
+
+ /**
+ * Initiates a call by sending the parameter to the remote server. Note: this is not called from
+ * the Connection thread, but by other threads.
+ * @see #readResponse()
+ */
+ private void writeRequest(Call call) throws IOException {
+ ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
+ this.compressor, call.cells);
+ CellBlockMeta cellBlockMeta;
+ if (cellBlock != null) {
+ cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
+ } else {
+ cellBlockMeta = null;
+ }
+ RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
+
+ setupIOstreams();
+
+ // Now we're going to write the call. We take the lock, then check that the connection
+ // is still valid, and, if so we do the write to the socket. If the write fails, we don't
+ // know where we stand, we have to close the connection.
+ if (Thread.interrupted()) {
+ throw new InterruptedIOException();
+ }
+
+ calls.put(call.id, call); // We put first as we don't want the connection to become idle.
+ // from here, we do not throw any exception to upper layer as the call has been tracked in the
+ // pending calls map.
+ try {
+ call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
+ } catch (IOException e) {
+ closeConn(e);
+ return;
+ }
+ notifyAll();
+ }
+
+ /*
+ * Receive a response. Because only one receiver, so no synchronization on in.
+ */
+ private void readResponse() {
+ Call call = null;
+ boolean expectedCall = false;
+ try {
+ // See HBaseServer.Call.setResponse for where we write out the response.
+ // Total size of the response. Unused. But have to read it in anyways.
+ int totalSize = in.readInt();
+
+ // Read the header
+ ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+ int id = responseHeader.getCallId();
+ call = calls.remove(id); // call.done have to be set before leaving this method
+ expectedCall = (call != null && !call.isDone());
+ if (!expectedCall) {
+ // So we got a response for which we have no corresponding 'call' here on the client-side.
+ // We probably timed out waiting, cleaned up all references, and now the server decides
+ // to return a response. There is nothing we can do w/ the response at this stage. Clean
+ // out the wire of the response so its out of the way and we can get other responses on
+ // this connection.
+ int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
+ int whatIsLeftToRead = totalSize - readSoFar;
+ IOUtils.skipFully(in, whatIsLeftToRead);
+ if (call != null) {
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats
+ .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+ }
+ return;
+ }
+ if (responseHeader.hasException()) {
+ ExceptionResponse exceptionResponse = responseHeader.getException();
+ RemoteException re = createRemoteException(exceptionResponse);
+ call.setException(re);
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats
+ .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+ if (isFatalConnectionException(exceptionResponse)) {
+ synchronized (this) {
+ closeConn(re);
+ }
+ }
+ } else {
+ Message value = null;
+ if (call.responseDefaultType != null) {
+ Builder builder = call.responseDefaultType.newBuilderForType();
+ ProtobufUtil.mergeDelimitedFrom(builder, in);
+ value = builder.build();
+ }
+ CellScanner cellBlockScanner = null;
+ if (responseHeader.hasCellBlockMeta()) {
+ int size = responseHeader.getCellBlockMeta().getLength();
+ byte[] cellBlock = new byte[size];
+ IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
+ cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
+ this.compressor, cellBlock);
+ }
+ call.setResponse(value, cellBlockScanner);
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats
+ .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+ }
+ } catch (IOException e) {
+ if (expectedCall) {
+ call.setException(e);
+ }
+ if (e instanceof SocketTimeoutException) {
+ // Clean up open calls but don't treat this as a fatal condition,
+ // since we expect certain responses to not make it by the specified
+ // {@link ConnectionId#rpcTimeout}.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ignored", e);
+ }
+ } else {
+ synchronized (this) {
+ closeConn(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected synchronized void callTimeout(Call call) {
+ // call sender
+ calls.remove(call.id);
+ }
+
+ // just close socket input and output.
+ private void closeSocket() {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(socket);
+ out = null;
+ in = null;
+ socket = null;
+ }
+
+ // close socket, reader, and clean up all pending calls.
+ private void closeConn(IOException e) {
+ if (thread == null) {
+ return;
+ }
+ thread.interrupt();
+ thread = null;
+ closeSocket();
+ if (callSender != null) {
+ callSender.cleanup(e);
+ }
+ for (Call call : calls.values()) {
+ call.setException(e);
+ }
+ calls.clear();
+ }
+
+ // release all resources, the connection will not be used any more.
+ @Override
+ public synchronized void shutdown() {
+ closed = true;
+ if (callSender != null) {
+ callSender.interrupt();
+ }
+ closeConn(new IOException("connection to " + remoteId.address + " closed"));
+ }
+
+ @Override
+ public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
+ throws IOException {
+ pcrc.notifyOnCancel(new RpcCallback<Object>() {
+
+ @Override
+ public void run(Object parameter) {
+ setCancelled(call);
+ synchronized (BlockingRpcConnection.this) {
+ if (callSender != null) {
+ callSender.remove(call);
+ } else {
+ calls.remove(call.id);
+ }
+ }
+ }
+ }, new CancellationCallback() {
+
+ @Override
+ public void run(boolean cancelled) throws IOException {
+ if (cancelled) {
+ setCancelled(call);
+ return;
+ }
+ scheduleTimeoutTask(call);
+ if (callSender != null) {
+ callSender.sendCall(call);
+ } else {
+ tracedWriteRequest(call);
+ }
+ }
+ });
+ }
+
+ @Override
+ public synchronized boolean isActive() {
+ return thread != null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
new file mode 100644
index 0000000..573ddd5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * We will expose the connection to upper layer before initialized, so we need to buffer the calls
+ * passed in and write them out once the connection is established.
+ */
+@InterfaceAudience.Private
+class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
+
+ private enum BufferCallAction {
+ FLUSH, FAIL
+ }
+
+ public static final class BufferCallEvent {
+
+ public final BufferCallAction action;
+
+ public final IOException error;
+
+ private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
+ IOException error) {
+ this.action = action;
+ this.error = error;
+ }
+
+ public static BufferCallBeforeInitHandler.BufferCallEvent success() {
+ return SUCCESS_EVENT;
+ }
+
+ public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) {
+ return new BufferCallEvent(BufferCallAction.FAIL, error);
+ }
+ }
+
+ private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH,
+ null);
+
+ private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+ if (msg instanceof Call) {
+ Call call = (Call) msg;
+ id2Call.put(call.id, call);
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof BufferCallEvent) {
+ BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
+ switch (bcEvt.action) {
+ case FLUSH:
+ for (Call call : id2Call.values()) {
+ ctx.write(call);
+ }
+ break;
+ case FAIL:
+ for (Call call : id2Call.values()) {
+ call.setException(bcEvt.error);
+ }
+ break;
+ }
+ ctx.flush();
+ ctx.pipeline().remove(this);
+ } else if (evt instanceof CallEvent) {
+ // just remove the call for now until we add other call event other than timeout and cancel.
+ id2Call.remove(((CallEvent) evt).call.id);
+ } else {
+ ctx.fireUserEventTriggered(evt);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index 73bc0e2..a6203d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+
+import io.netty.util.Timeout;
import java.io.IOException;
@@ -27,29 +30,39 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
/** A call waiting for a value. */
@InterfaceAudience.Private
-public class Call {
- final int id; // call id
- final Message param; // rpc request method param object
+class Call {
+ final int id; // call id
+ final Message param; // rpc request method param object
/**
- * Optionally has cells when making call. Optionally has cells set on response. Used
- * passing cells to the rpc and receiving the response.
+ * Optionally has cells when making call. Optionally has cells set on response. Used passing cells
+ * to the rpc and receiving the response.
*/
CellScanner cells;
- Message response; // value, null if error
- // The return type. Used to create shell into which we deserialize the response if any.
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "Direct access is only allowed after done")
+ Message response; // value, null if error
+ // The return type. Used to create shell into which we deserialize the response if any.
Message responseDefaultType;
- IOException error; // exception, null if value
- volatile boolean done; // true when call is done
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "Direct access is only allowed after done")
+ IOException error; // exception, null if value
+ private boolean done; // true when call is done
final Descriptors.MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite.
+ final int priority;
final MetricsConnection.CallStats callStats;
+ final RpcCallback<Call> callback;
+ final Span span;
+ Timeout timeoutTask;
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
- final CellScanner cells, final Message responseDefaultType, int timeout,
- MetricsConnection.CallStats callStats) {
+ final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
+ RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;
this.md = md;
this.cells = cells;
@@ -58,73 +71,74 @@ public class Call {
this.responseDefaultType = responseDefaultType;
this.id = id;
this.timeout = timeout;
+ this.priority = priority;
+ this.callback = callback;
+ this.span = Trace.currentSpan();
+ }
+
+ @Override
+ public String toString() {
+ return "callId: " + this.id + " methodName: " + this.md.getName() + " param {"
+ + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
}
/**
- * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
- * @return true if the call is on timeout, false otherwise.
+ * called from timeoutTask, prevent self cancel
*/
- public boolean checkAndSetTimeout() {
- if (timeout == 0){
- return false;
- }
-
- long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
- if (waitTime >= timeout) {
- IOException ie = new CallTimeoutException("Call id=" + id +
- ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
- setException(ie); // includes a notify
- return true;
- } else {
- return false;
+ public void setTimeout(IOException error) {
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.error = error;
}
+ callback.run(this);
}
- public int remainingTime() {
- if (timeout == 0) {
- return Integer.MAX_VALUE;
+ private void callComplete() {
+ if (timeoutTask != null) {
+ timeoutTask.cancel();
}
-
- int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime());
- return remaining > 0 ? remaining : 0;
- }
-
- @Override
- public String toString() {
- return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
- (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
+ callback.run(this);
}
- /** Indicate when the call is complete and the
- * value or error are available. Notifies by default. */
- protected synchronized void callComplete() {
- this.done = true;
- notify(); // notify caller
- }
-
- /** Set the exception when there is an error.
- * Notify the caller the call is done.
- *
+ /**
+ * Set the exception when there is an error. Notify the caller the call is done.
* @param error exception thrown by the call; either local or remote
*/
public void setException(IOException error) {
- this.error = error;
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.error = error;
+ }
callComplete();
}
/**
- * Set the return value when there is no error.
- * Notify the caller the call is done.
- *
+ * Set the return value when there is no error. Notify the caller the call is done.
* @param response return value of the call.
* @param cells Can be null
*/
public void setResponse(Message response, final CellScanner cells) {
- this.response = response;
- this.cells = cells;
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.response = response;
+ this.cells = cells;
+ }
callComplete();
}
+ public synchronized boolean isDone() {
+ return done;
+ }
+
public long getStartTime() {
return this.callStats.getStartTime();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
new file mode 100644
index 0000000..a6777c0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Client side call cancelled.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CallCancelledException extends HBaseIOException {
+
+ private static final long serialVersionUID = 309775809470318208L;
+
+ public CallCancelledException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
new file mode 100644
index 0000000..1c2ea32
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to tell netty handler the call is cancelled, timeout...
+ */
+@InterfaceAudience.Private
+class CallEvent {
+
+ public enum Type {
+ TIMEOUT, CANCELLED
+ }
+
+ final Type type;
+
+ final Call call;
+
+ CallEvent(Type type, Call call) {
+ this.type = type;
+ this.call = call;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
index 1e31f72..db8c34a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
@@ -17,8 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;
-import java.io.IOException;
-
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -28,7 +27,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class CallTimeoutException extends IOException {
+public class CallTimeoutException extends HBaseIOException {
+
public CallTimeoutException(final String msg) {
super(msg);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index 072a490..fb2cafa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.ipc;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufOutputStream;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
@@ -46,12 +50,13 @@ import org.apache.hadoop.io.compress.Decompressor;
* Helper class for building cell block.
*/
@InterfaceAudience.Private
-public class CellBlockBuilder {
+class CellBlockBuilder {
// LOG is being used in TestCellBlockBuilder
static final Log LOG = LogFactory.getLog(CellBlockBuilder.class);
private final Configuration conf;
+
/**
* How much we think the decompressor will expand the original compressed content.
*/
@@ -59,7 +64,7 @@ public class CellBlockBuilder {
private final int cellBlockBuildingInitialBufferSize;
- public CellBlockBuilder(final Configuration conf) {
+ public CellBlockBuilder(Configuration conf) {
this.conf = conf;
this.cellBlockDecompressionMultiplier = conf
.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
@@ -70,44 +75,104 @@ public class CellBlockBuilder {
.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
}
+ private interface OutputStreamSupplier {
+
+ OutputStream get(int expectedSize);
+
+ int size();
+ }
+
+ private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier {
+
+ private ByteBufferOutputStream baos;
+
+ @Override
+ public OutputStream get(int expectedSize) {
+ baos = new ByteBufferOutputStream(expectedSize);
+ return baos;
+ }
+
+ @Override
+ public int size() {
+ return baos.size();
+ }
+ }
+
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
- * @param codec to use for encoding
- * @param compressor to use for encoding
- * @param cellScanner to encode
+ * @param codec
+ * @param compressor
+ * @param cellScanner
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size.
- * @throws IOException if encoding the cells fail
+ * @throws IOException
*/
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner) throws IOException {
- if (cellScanner == null) {
+ ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
+ if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+ ByteBuffer bb = supplier.baos.getByteBuffer();
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ return bb.hasRemaining() ? bb : null;
+ } else {
return null;
}
+ }
+
+ private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier {
+
+ private final ByteBufAllocator alloc;
+
+ private ByteBuf buf;
+
+ public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) {
+ this.alloc = alloc;
+ }
+
+ @Override
+ public OutputStream get(int expectedSize) {
+ buf = alloc.buffer(expectedSize);
+ return new ByteBufOutputStream(buf);
+ }
+
+ @Override
+ public int size() {
+ return buf.writerIndex();
+ }
+ }
+
+ public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner,
+ ByteBufAllocator alloc) throws IOException {
+ ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
+ if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+ return supplier.buf;
+ } else {
+ return null;
+ }
+ }
+
+ private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
+ if (cellScanner == null) {
+ return false;
+ }
if (codec == null) {
throw new CellScannerButNoCodecException();
}
- int bufferSize = this.cellBlockBuildingInitialBufferSize;
- ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
- encodeCellsTo(baos, cellScanner, codec, compressor);
- if (LOG.isTraceEnabled()) {
- if (bufferSize < baos.size()) {
- LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
- + "; up hbase.ipc.cellblock.building.initial.buffersize?");
- }
+ int bufferSize = cellBlockBuildingInitialBufferSize;
+ encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
+ if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
+ LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size()
+ + "; up hbase.ipc.cellblock.building.initial.buffersize?");
}
- ByteBuffer bb = baos.getByteBuffer();
- // If no cells, don't mess around. Just return null (could be a bunch of existence checking
- // gets or something -- stuff that does not return a cell).
- if (!bb.hasRemaining()) return null;
- return bb;
+ return true;
}
- private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
+ private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
CompressionCodec compressor) throws IOException {
- OutputStream os = bbos;
Compressor poolCompressor = null;
try {
if (compressor != null) {
@@ -122,7 +187,7 @@ public class CellBlockBuilder {
encoder.write(cellScanner.current());
}
encoder.flush();
- } catch (BufferOverflowException e) {
+ } catch (BufferOverflowException | IndexOutOfBoundsException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
index 08f8171..1b837d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.security.User;
* to servers are uniquely identified by <remoteAddress, ticket, serviceName>
*/
@InterfaceAudience.Private
-public class ConnectionId {
+class ConnectionId {
private static final int PRIME = 16777619;
final User ticket;
final String serviceName;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
new file mode 100644
index 0000000..c7c0f32
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The default netty event loop config
+ */
+@InterfaceAudience.Private
+class DefaultNettyEventLoopConfig {
+
+ public static final Pair<EventLoopGroup, Class<? extends Channel>> GROUP_AND_CHANNEL_CLASS = Pair
+ .<EventLoopGroup, Class<? extends Channel>> newPair(new NioEventLoopGroup(),
+ NioSocketChannel.class);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
new file mode 100644
index 0000000..721148b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Indicate that the rpc server tells client to fallback to simple auth but client is disabled to do
+ * so.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FallbackDisallowedException extends HBaseIOException {
+
+ private static final long serialVersionUID = -6942845066279358253L;
+
+ public FallbackDisallowedException() {
+ super("Server asks us to fall back to SIMPLE auth, "
+ + "but this client is configured to only allow secure connections.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
deleted file mode 100644
index 09dda09..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Converts exceptions to other exceptions
- */
-@InterfaceAudience.Private
-public interface IOExceptionConverter {
- /**
- * Converts given IOException
- * @param e exception to convert
- * @return converted IOException
- */
- IOException convert(IOException e);
-}