You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/08/25 00:55:26 UTC
hbase git commit: HBASE-16433 Remove AsyncRpcChannel related stuffs
Repository: hbase
Updated Branches:
refs/heads/master 8a692ff18 -> a1f760ff7
HBASE-16433 Remove AsyncRpcChannel related stuffs
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1f760ff
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1f760ff
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1f760ff
Branch: refs/heads/master
Commit: a1f760ff763bacbfcfd6eb80d5076ec35e3b27e3
Parents: 8a692ff
Author: zhangduo <zh...@apache.org>
Authored: Wed Aug 17 17:03:53 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Aug 25 08:15:46 2016 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Future.java | 34 -
.../hbase/client/ResponseFutureListener.java | 30 -
.../org/apache/hadoop/hbase/ipc/AsyncCall.java | 10 +-
.../hadoop/hbase/ipc/AsyncRpcChannel.java | 728 +++++++++++++++++-
.../hadoop/hbase/ipc/AsyncRpcChannelImpl.java | 770 -------------------
.../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 28 +-
.../hbase/ipc/AsyncServerResponseHandler.java | 4 +-
.../org/apache/hadoop/hbase/ipc/Promise.java | 38 -
.../org/apache/hadoop/hbase/ipc/RpcClient.java | 20 +-
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 156 +---
.../hadoop/hbase/ipc/AbstractTestIPC.java | 146 +---
11 files changed, 730 insertions(+), 1234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
deleted file mode 100644
index 99a8baa..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * Promise for responses
- * @param <V> Value type
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-
-@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_SAME_SIMPLE_NAME_AS_INTERFACE",
- justification="Agree that this can be confusing but folks will pull in this and think twice "
- + "about pulling in netty; incidence of confusion should be rare in this case.")
-public interface Future<V> extends io.netty.util.concurrent.Future<V> {
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
deleted file mode 100644
index f23dc8f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import io.netty.util.concurrent.GenericFutureListener;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Specific interface for the Response future listener
- * @param <V> Value type.
- */
-@InterfaceAudience.Private
-public interface ResponseFutureListener<V>
- extends GenericFutureListener<Future<V>> {
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index 89e6ca4..33536df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -19,7 +19,11 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
+
+import io.netty.util.concurrent.DefaultPromise;
+
import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
@@ -39,12 +43,12 @@ import org.apache.hadoop.ipc.RemoteException;
* @param <M> Message returned in communication to be converted
*/
@InterfaceAudience.Private
-public class AsyncCall<M extends Message, T> extends Promise<T> {
+public class AsyncCall<M extends Message, T> extends DefaultPromise<T> {
private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
final int id;
- private final AsyncRpcChannelImpl channel;
+ private final AsyncRpcChannel channel;
final Descriptors.MethodDescriptor method;
final Message param;
@@ -77,7 +81,7 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
* @param priority for this request
* @param metrics MetricsConnection to which the metrics are stored for this request
*/
- public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor
+ public AsyncCall(AsyncRpcChannel channel, int connectId, Descriptors.MethodDescriptor
md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
MetricsConnection metrics) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 8cc730f..9550f2a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -20,19 +20,298 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
-import io.netty.util.concurrent.EventExecutor;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.SaslClientHandler;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
/**
- * Interface for Async Rpc Channels
+ * Netty RPC channel
*/
@InterfaceAudience.Private
-public interface AsyncRpcChannel {
+public class AsyncRpcChannel {
+ private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
+
+ private static final int MAX_SASL_RETRIES = 5;
+
+ protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
+ = new HashMap<>();
+
+ static {
+ TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
+ new AuthenticationTokenSelector());
+ }
+
+ final AsyncRpcClient client;
+
+ // Contains the channel to work with.
+ // Only exists when connected
+ private Channel channel;
+
+ String name;
+ final User ticket;
+ final String serviceName;
+ final InetSocketAddress address;
+
+ private int failureCounter = 0;
+
+ boolean useSasl;
+ AuthMethod authMethod;
+ private int reloginMaxBackoff;
+ private Token<? extends TokenIdentifier> token;
+ private String serverPrincipal;
+
+ // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
+ private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
+ private boolean connected = false;
+ private boolean closed = false;
+
+ private Timeout cleanupTimer;
+
+ private final TimerTask timeoutTask = new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ cleanupCalls();
+ }
+ };
+
+ /**
+ * Constructor for netty RPC channel
+ * @param bootstrap to construct channel on
+ * @param client to connect with
+ * @param ticket of user which uses connection
+ * @param serviceName name of service to connect to
+ * @param address to connect to
+ */
+ public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
+ String serviceName, InetSocketAddress address) {
+ this.client = client;
+
+ this.ticket = ticket;
+ this.serviceName = serviceName;
+ this.address = address;
+
+ this.channel = connect(bootstrap).channel();
+
+ name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
+ + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
+ }
+
+ /**
+ * Connect to channel
+ * @param bootstrap to connect to
+ * @return future of connection
+ */
+ private ChannelFuture connect(final Bootstrap bootstrap) {
+ return bootstrap.remoteAddress(address).connect()
+ .addListener(new GenericFutureListener<ChannelFuture>() {
+ @Override
+ public void operationComplete(final ChannelFuture f) throws Exception {
+ if (!f.isSuccess()) {
+ retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
+ return;
+ }
+ channel = f.channel();
+
+ setupAuthorization();
+
+ ByteBuf b = channel.alloc().directBuffer(6);
+ createPreamble(b, authMethod);
+ channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+ if (useSasl) {
+ UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
+ if (authMethod == AuthMethod.KERBEROS) {
+ if (ticket != null && ticket.getRealUser() != null) {
+ ticket = ticket.getRealUser();
+ }
+ }
+ SaslClientHandler saslHandler;
+ if (ticket == null) {
+ throw new FatalConnectionException("ticket/user is null");
+ }
+ final UserGroupInformation realTicket = ticket;
+ saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
+ @Override
+ public SaslClientHandler run() throws IOException {
+ return getSaslHandler(realTicket, bootstrap);
+ }
+ });
+ if (saslHandler != null) {
+ // Sasl connect is successful. Let's set up Sasl channel handler
+ channel.pipeline().addFirst(saslHandler);
+ } else {
+ // fall back to simple auth because server told us so.
+ authMethod = AuthMethod.SIMPLE;
+ useSasl = false;
+ }
+ } else {
+ startHBaseConnection(f.channel());
+ }
+ }
+ });
+ }
+
+ /**
+ * Start HBase connection
+ * @param ch channel to start connection on
+ */
+ private void startHBaseConnection(Channel ch) {
+ ch.pipeline().addLast("frameDecoder",
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+ ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+ try {
+ writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ close(future.cause());
+ return;
+ }
+ List<AsyncCall> callsToWrite;
+ synchronized (pendingCalls) {
+ connected = true;
+ callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+ }
+ for (AsyncCall call : callsToWrite) {
+ writeRequest(call);
+ }
+ }
+ });
+ } catch (IOException e) {
+ close(e);
+ }
+ }
+
+ private void startConnectionWithEncryption(Channel ch) {
+ // for rpc encryption, the order of ChannelInboundHandler should be:
+ // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
+ // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
+ // SaslClientHandler will handler this
+ ch.pipeline().addFirst("beforeUnwrapDecoder",
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
+ ch.pipeline().addLast("afterUnwrapDecoder",
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+ ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+ List<AsyncCall> callsToWrite;
+ synchronized (pendingCalls) {
+ connected = true;
+ callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+ }
+ for (AsyncCall call : callsToWrite) {
+ writeRequest(call);
+ }
+ }
+
+ /**
+ * Get SASL handler
+ * @param bootstrap to reconnect to
+ * @return new SASL handler
+ * @throws java.io.IOException if handler failed to create
+ */
+ private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
+ final Bootstrap bootstrap) throws IOException {
+ return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
+ client.fallbackAllowed,
+ client.conf.get("hbase.rpc.protection",
+ SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
+ getChannelHeaderBytes(authMethod),
+ new SaslClientHandler.SaslExceptionHandler() {
+ @Override
+ public void handle(int retryCount, Random random, Throwable cause) {
+ try {
+ // Handle Sasl failure. Try to potentially get new credentials
+ handleSaslConnectionFailure(retryCount, cause, realTicket);
+
+ retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
+ cause);
+ } catch (IOException | InterruptedException e) {
+ close(e);
+ }
+ }
+ }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
+ @Override
+ public void onSuccess(Channel channel) {
+ startHBaseConnection(channel);
+ }
+
+ @Override
+ public void onSaslProtectionSucess(Channel channel) {
+ startConnectionWithEncryption(channel);
+ }
+ });
+ }
+
+ /**
+ * Retry to connect or close
+ * @param bootstrap to connect with
+ * @param failureCount failure count
+ * @param e exception of fail
+ */
+ private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
+ Throwable e) {
+ if (failureCount < client.maxRetries) {
+ client.newTimeout(new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ connect(bootstrap);
+ }
+ }, timeout, TimeUnit.MILLISECONDS);
+ } else {
+ client.failedServers.addToFailedServers(address);
+ close(e);
+ }
+ }
/**
* Calls method on channel
@@ -40,41 +319,450 @@ public interface AsyncRpcChannel {
* @param request to send
* @param cellScanner with cells to send
* @param responsePrototype to construct response with
- * @param messageConverter for the messages to expected result
- * @param exceptionConverter for converting exceptions
* @param rpcTimeout timeout for request
* @param priority for request
* @return Promise for the response Message
*/
- <R extends Message, O> Future<O> callMethod(
+ public <R extends Message, O> io.netty.util.concurrent.Promise<O> callMethod(
final Descriptors.MethodDescriptor method,
- final Message request, final CellScanner cellScanner,
- R responsePrototype, MessageConverter<R, O> messageConverter,
- IOExceptionConverter exceptionConverter, long rpcTimeout, int priority);
+ final Message request,final CellScanner cellScanner,
+ R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
+ exceptionConverter, long rpcTimeout, int priority) {
+ final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
+ method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
+ rpcTimeout, priority, client.metrics);
+
+ synchronized (pendingCalls) {
+ if (closed) {
+ call.setFailure(new ConnectException());
+ return call;
+ }
+ pendingCalls.put(call.id, call);
+ // Add timeout for cleanup if none is present
+ if (cleanupTimer == null && call.getRpcTimeout() > 0) {
+ cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ if (!connected) {
+ return call;
+ }
+ }
+ writeRequest(call);
+ return call;
+ }
+
+ public EventLoop getEventExecutor() {
+ return this.channel.eventLoop();
+ }
+
+ AsyncCall removePendingCall(int id) {
+ synchronized (pendingCalls) {
+ return pendingCalls.remove(id);
+ }
+ }
+
+ /**
+ * Write the channel header
+ * @param channel to write to
+ * @return future of write
+ * @throws java.io.IOException on failure to write
+ */
+ private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
+ RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+ int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
+ ByteBuf b = channel.alloc().directBuffer(totalSize);
+
+ b.writeInt(header.getSerializedSize());
+ b.writeBytes(header.toByteArray());
+ return channel.writeAndFlush(b);
+ }
+
+ private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
+ RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+ ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
+ b.putInt(header.getSerializedSize());
+ b.put(header.toByteArray());
+ return b.array();
+ }
+
+ private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
+ RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
+ .setServiceName(serviceName);
+
+ RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
+ if (userInfoPB != null) {
+ headerBuilder.setUserInfo(userInfoPB);
+ }
+
+ if (client.codec != null) {
+ headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
+ }
+ if (client.compressor != null) {
+ headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
+ }
+
+ headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
+ return headerBuilder.build();
+ }
+
+ /**
+ * Write request to channel
+ * @param call to write
+ */
+ private void writeRequest(final AsyncCall call) {
+ try {
+ final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
+ .newBuilder();
+ requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
+ .setRequestParam(call.param != null);
+
+ if (Trace.isTracing()) {
+ Span s = Trace.currentSpan();
+ requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
+ .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+ }
+
+ ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
+ if (cellBlock != null) {
+ final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
+ .newBuilder();
+ cellBlockBuilder.setLength(cellBlock.limit());
+ requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
+ }
+ // Only pass priority if there one. Let zero be same as no priority.
+ if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
+ requestHeaderBuilder.setPriority(call.getPriority());
+ }
+ requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
+ Integer.MAX_VALUE : (int)call.rpcTimeout);
+
+ RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
+
+ int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
+ if (cellBlock != null) {
+ totalSize += cellBlock.remaining();
+ }
+
+ ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
+ try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
+ call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
+ }
+
+ channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
+ } catch (IOException e) {
+ close(e);
+ }
+ }
+
+ /**
+ * Set up server authorization
+ * @throws java.io.IOException if auth setup failed
+ */
+ private void setupAuthorization() throws IOException {
+ SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
+ this.useSasl = client.userProvider.isHBaseSecurityEnabled();
+
+ this.token = null;
+ if (useSasl && securityInfo != null) {
+ AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
+ if (tokenKind != null) {
+ TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
+ if (tokenSelector != null) {
+ token = tokenSelector.selectToken(new Text(client.clusterId),
+ ticket.getUGI().getTokens());
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("No token selector found for type " + tokenKind);
+ }
+ }
+ String serverKey = securityInfo.getServerPrincipal();
+ if (serverKey == null) {
+ throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
+ }
+ this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
+ address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
+ + serverPrincipal);
+ }
+ }
+
+ if (!useSasl) {
+ authMethod = AuthMethod.SIMPLE;
+ } else if (token != null) {
+ authMethod = AuthMethod.DIGEST;
+ } else {
+ authMethod = AuthMethod.KERBEROS;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
+ }
+ reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
+ }
+
+ /**
+ * Build the user information
+ * @param ugi User Group Information
+ * @param authMethod Authorization method
+ * @return UserInformation protobuf
+ */
+ private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
+ if (ugi == null || authMethod == AuthMethod.DIGEST) {
+ // Don't send user for token auth
+ return null;
+ }
+ RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
+ if (authMethod == AuthMethod.KERBEROS) {
+ // Send effective user for Kerberos auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ } else if (authMethod == AuthMethod.SIMPLE) {
+ // Send both effective user and real user for simple auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ if (ugi.getRealUser() != null) {
+ userInfoPB.setRealUser(ugi.getRealUser().getUserName());
+ }
+ }
+ return userInfoPB.build();
+ }
/**
- * Get the EventLoop on which this channel operated
- * @return EventLoop
+ * Create connection preamble
+ * @param byteBuf to write to
+ * @param authMethod to write
*/
- EventExecutor getEventExecutor();
+ private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
+ byteBuf.writeBytes(HConstants.RPC_HEADER);
+ byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
+ byteBuf.writeByte(authMethod.code);
+ }
+
+ private void close0(Throwable e) {
+ List<AsyncCall> toCleanup;
+ synchronized (pendingCalls) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
+ pendingCalls.clear();
+ }
+ IOException closeException = null;
+ if (e != null) {
+ if (e instanceof IOException) {
+ closeException = (IOException) e;
+ } else {
+ closeException = new IOException(e);
+ }
+ }
+ // log the info
+ if (LOG.isDebugEnabled() && closeException != null) {
+ LOG.debug(name + ": closing ipc connection to " + address, closeException);
+ }
+ if (cleanupTimer != null) {
+ cleanupTimer.cancel();
+ cleanupTimer = null;
+ }
+ for (AsyncCall call : toCleanup) {
+ call.setFailed(closeException != null ? closeException
+ : new ConnectionClosingException(
+ "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
+ }
+ channel.disconnect().addListener(ChannelFutureListener.CLOSE);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(name + ": closed");
+ }
+ }
/**
* Close connection
- * @param cause of closure.
+ * @param e exception on close
*/
- void close(Throwable cause);
+ public void close(final Throwable e) {
+ client.removeConnection(this);
+
+ // Move closing from the requesting thread to the channel thread
+ if (channel.eventLoop().inEventLoop()) {
+ close0(e);
+ } else {
+ channel.eventLoop().execute(new Runnable() {
+ @Override
+ public void run() {
+ close0(e);
+ }
+ });
+ }
+ }
+
+ /**
+ * Clean up calls.
+ */
+ private void cleanupCalls() {
+ List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ long nextCleanupTaskDelay = -1L;
+ synchronized (pendingCalls) {
+ for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
+ AsyncCall call = iter.next();
+ long timeout = call.getRpcTimeout();
+ if (timeout > 0) {
+ if (currentTime - call.getStartTime() >= timeout) {
+ iter.remove();
+ toCleanup.add(call);
+ } else {
+ if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
+ nextCleanupTaskDelay = timeout;
+ }
+ }
+ }
+ }
+ if (nextCleanupTaskDelay > 0) {
+ cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
+ } else {
+ cleanupTimer = null;
+ }
+ }
+ for (AsyncCall call : toCleanup) {
+ call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
+ + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
+ }
+ }
/**
* Check if the connection is alive
- *
* @return true if alive
*/
- boolean isAlive();
+ public boolean isAlive() {
+ return channel.isOpen();
+ }
+
+ public InetSocketAddress getAddress() {
+ return this.address;
+ }
+
+ /**
+ * Check if user should authenticate over Kerberos
+ * @return true if should be authenticated over Kerberos
+ * @throws java.io.IOException on failure of check
+ */
+ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUser = currentUser.getRealUser();
+ return authMethod == AuthMethod.KERBEROS && loginUser != null &&
+ // Make sure user logged in using Kerberos either keytab or TGT
+ loginUser.hasKerberosCredentials() &&
+ // relogin only in case it is the login user (e.g. JT)
+ // or superuser (like oozie).
+ (loginUser.equals(currentUser) || loginUser.equals(realUser));
+ }
/**
- * Get the address on which this channel operates
- * @return InetSocketAddress
+ * If multiple clients with the same principal try to connect to the same server at the same time,
+ * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
+ * work around this, what is done is that the client backs off randomly and tries to initiate the
+ * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
+ * attempted.
+ * <p>
+ * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
+ * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
+ * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
+ * underlying authentication implementation, so there is no retry from other high level (for eg,
+ * HCM or HBaseAdmin).
+ * </p>
+ * @param currRetries retry count
+ * @param ex exception describing fail
+ * @param user which is trying to connect
+ * @throws java.io.IOException if IO fail
+ * @throws InterruptedException if thread is interrupted
*/
- InetSocketAddress getAddress();
+ private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
+ final UserGroupInformation user) throws IOException, InterruptedException {
+ user.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws IOException, InterruptedException {
+ if (shouldAuthenticateOverKrb()) {
+ if (currRetries < MAX_SASL_RETRIES) {
+ LOG.debug("Exception encountered while connecting to the server : " + ex);
+ // try re-login
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ } else {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+
+ // Should reconnect
+ return null;
+ } else {
+ String msg = "Couldn't setup connection for "
+ + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
+ LOG.warn(msg, ex);
+ throw (IOException) new IOException(msg).initCause(ex);
+ }
+ } else {
+ LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
+ }
+ if (ex instanceof RemoteException) {
+ throw (RemoteException) ex;
+ }
+ if (ex instanceof SaslException) {
+ String msg = "SASL authentication failed."
+ + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
+ LOG.fatal(msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ throw new IOException(ex);
+ }
+ });
+ }
+
+ public int getConnectionHashCode() {
+ return ConnectionId.hashCode(ticket, serviceName, address);
+ }
+
+ @Override
+ public int hashCode() {
+ return getConnectionHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof AsyncRpcChannel) {
+ AsyncRpcChannel channel = (AsyncRpcChannel) obj;
+ return channel.hashCode() == obj.hashCode();
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
+ }
+
+ /**
+ * Listens to call writes and fails if write failed
+ */
+ private static final class CallWriteListener implements ChannelFutureListener {
+ private final AsyncRpcChannel rpcChannel;
+ private final int id;
+
+ public CallWriteListener(AsyncRpcChannel asyncRpcChannelImpl, int id) {
+ this.rpcChannel = asyncRpcChannelImpl;
+ this.id = id;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ AsyncCall call = rpcChannel.removePendingCall(id);
+ if (call != null) {
+ if (future.cause() instanceof IOException) {
+ call.setFailed((IOException) future.cause());
+ } else {
+ call.setFailed(new IOException(future.cause()));
+ }
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
deleted file mode 100644
index 6b7dc5b..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
+++ /dev/null
@@ -1,770 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Locale;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.sasl.SaslException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslClientHandler;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-
-/**
- * Netty RPC channel
- */
-@InterfaceAudience.Private
-public class AsyncRpcChannelImpl implements AsyncRpcChannel {
- private static final Log LOG = LogFactory.getLog(AsyncRpcChannelImpl.class.getName());
-
- private static final int MAX_SASL_RETRIES = 5;
-
- protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
- = new HashMap<>();
-
- static {
- TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
- new AuthenticationTokenSelector());
- }
-
- final AsyncRpcClient client;
-
- // Contains the channel to work with.
- // Only exists when connected
- private Channel channel;
-
- String name;
- final User ticket;
- final String serviceName;
- final InetSocketAddress address;
-
- private int failureCounter = 0;
-
- boolean useSasl;
- AuthMethod authMethod;
- private int reloginMaxBackoff;
- private Token<? extends TokenIdentifier> token;
- private String serverPrincipal;
-
- // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
- private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
- private boolean connected = false;
- private boolean closed = false;
-
- private Timeout cleanupTimer;
-
- private final TimerTask timeoutTask = new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- cleanupCalls();
- }
- };
-
- /**
- * Constructor for netty RPC channel
- * @param bootstrap to construct channel on
- * @param client to connect with
- * @param ticket of user which uses connection
- * @param serviceName name of service to connect to
- * @param address to connect to
- */
- public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
- String serviceName, InetSocketAddress address) {
- this.client = client;
-
- this.ticket = ticket;
- this.serviceName = serviceName;
- this.address = address;
-
- this.channel = connect(bootstrap).channel();
-
- name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
- + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
- }
-
- /**
- * Connect to channel
- * @param bootstrap to connect to
- * @return future of connection
- */
- private ChannelFuture connect(final Bootstrap bootstrap) {
- return bootstrap.remoteAddress(address).connect()
- .addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(final ChannelFuture f) throws Exception {
- if (!f.isSuccess()) {
- retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
- return;
- }
- channel = f.channel();
-
- setupAuthorization();
-
- ByteBuf b = channel.alloc().directBuffer(6);
- createPreamble(b, authMethod);
- channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- if (useSasl) {
- UserGroupInformation ticket = AsyncRpcChannelImpl.this.ticket.getUGI();
- if (authMethod == AuthMethod.KERBEROS) {
- if (ticket != null && ticket.getRealUser() != null) {
- ticket = ticket.getRealUser();
- }
- }
- SaslClientHandler saslHandler;
- if (ticket == null) {
- throw new FatalConnectionException("ticket/user is null");
- }
- final UserGroupInformation realTicket = ticket;
- saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
- @Override
- public SaslClientHandler run() throws IOException {
- return getSaslHandler(realTicket, bootstrap);
- }
- });
- if (saslHandler != null) {
- // Sasl connect is successful. Let's set up Sasl channel handler
- channel.pipeline().addFirst(saslHandler);
- } else {
- // fall back to simple auth because server told us so.
- authMethod = AuthMethod.SIMPLE;
- useSasl = false;
- }
- } else {
- startHBaseConnection(f.channel());
- }
- }
- });
- }
-
- /**
- * Start HBase connection
- * @param ch channel to start connection on
- */
- private void startHBaseConnection(Channel ch) {
- ch.pipeline().addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
- ch.pipeline().addLast(new AsyncServerResponseHandler(this));
- try {
- writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- close(future.cause());
- return;
- }
- List<AsyncCall> callsToWrite;
- synchronized (pendingCalls) {
- connected = true;
- callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
- }
- for (AsyncCall call : callsToWrite) {
- writeRequest(call);
- }
- }
- });
- } catch (IOException e) {
- close(e);
- }
- }
-
- private void startConnectionWithEncryption(Channel ch) {
- // for rpc encryption, the order of ChannelInboundHandler should be:
- // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
- // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
- // SaslClientHandler will handler this
- ch.pipeline().addFirst("beforeUnwrapDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
- ch.pipeline().addLast("afterUnwrapDecoder",
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
- ch.pipeline().addLast(new AsyncServerResponseHandler(this));
- List<AsyncCall> callsToWrite;
- synchronized (pendingCalls) {
- connected = true;
- callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
- }
- for (AsyncCall call : callsToWrite) {
- writeRequest(call);
- }
- }
-
- /**
- * Get SASL handler
- * @param bootstrap to reconnect to
- * @return new SASL handler
- * @throws java.io.IOException if handler failed to create
- */
- private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
- final Bootstrap bootstrap) throws IOException {
- return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
- client.fallbackAllowed,
- client.conf.get("hbase.rpc.protection",
- SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
- getChannelHeaderBytes(authMethod),
- new SaslClientHandler.SaslExceptionHandler() {
- @Override
- public void handle(int retryCount, Random random, Throwable cause) {
- try {
- // Handle Sasl failure. Try to potentially get new credentials
- handleSaslConnectionFailure(retryCount, cause, realTicket);
-
- retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
- cause);
- } catch (IOException | InterruptedException e) {
- close(e);
- }
- }
- }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
- @Override
- public void onSuccess(Channel channel) {
- startHBaseConnection(channel);
- }
-
- @Override
- public void onSaslProtectionSucess(Channel channel) {
- startConnectionWithEncryption(channel);
- }
- });
- }
-
- /**
- * Retry to connect or close
- * @param bootstrap to connect with
- * @param failureCount failure count
- * @param e exception of fail
- */
- private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
- Throwable e) {
- if (failureCount < client.maxRetries) {
- client.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- connect(bootstrap);
- }
- }, timeout, TimeUnit.MILLISECONDS);
- } else {
- client.failedServers.addToFailedServers(address);
- close(e);
- }
- }
-
- /**
- * Calls method on channel
- * @param method to call
- * @param request to send
- * @param cellScanner with cells to send
- * @param responsePrototype to construct response with
- * @param rpcTimeout timeout for request
- * @param priority for request
- * @return Promise for the response Message
- */
- @Override
- public <R extends Message, O> Future<O> callMethod(
- final Descriptors.MethodDescriptor method,
- final Message request,final CellScanner cellScanner,
- R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
- exceptionConverter, long rpcTimeout, int priority) {
- final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
- method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
- rpcTimeout, priority, client.metrics);
-
- synchronized (pendingCalls) {
- if (closed) {
- call.setFailure(new ConnectException());
- return call;
- }
- pendingCalls.put(call.id, call);
- // Add timeout for cleanup if none is present
- if (cleanupTimer == null && call.getRpcTimeout() > 0) {
- cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
- }
- if (!connected) {
- return call;
- }
- }
- writeRequest(call);
- return call;
- }
-
- @Override
- public EventLoop getEventExecutor() {
- return this.channel.eventLoop();
- }
-
- AsyncCall removePendingCall(int id) {
- synchronized (pendingCalls) {
- return pendingCalls.remove(id);
- }
- }
-
- /**
- * Write the channel header
- * @param channel to write to
- * @return future of write
- * @throws java.io.IOException on failure to write
- */
- private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
- RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
- int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
- ByteBuf b = channel.alloc().directBuffer(totalSize);
-
- b.writeInt(header.getSerializedSize());
- b.writeBytes(header.toByteArray());
-
- return channel.writeAndFlush(b);
- }
-
- private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
- RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
- ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
- b.putInt(header.getSerializedSize());
- b.put(header.toByteArray());
- return b.array();
- }
-
- private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
- RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
- .setServiceName(serviceName);
-
- RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
- if (userInfoPB != null) {
- headerBuilder.setUserInfo(userInfoPB);
- }
-
- if (client.codec != null) {
- headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
- }
- if (client.compressor != null) {
- headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
- }
-
- headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
- return headerBuilder.build();
- }
-
- /**
- * Write request to channel
- * @param call to write
- */
- private void writeRequest(final AsyncCall call) {
- try {
- final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
- .newBuilder();
- requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
- .setRequestParam(call.param != null);
-
- if (Trace.isTracing()) {
- Span s = Trace.currentSpan();
- requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
- .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
- }
-
- ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
- if (cellBlock != null) {
- final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
- .newBuilder();
- cellBlockBuilder.setLength(cellBlock.limit());
- requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
- }
- // Only pass priority if there one. Let zero be same as no priority.
- if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
- requestHeaderBuilder.setPriority(call.getPriority());
- }
- requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
- Integer.MAX_VALUE : (int)call.rpcTimeout);
-
- RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
-
- int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
- if (cellBlock != null) {
- totalSize += cellBlock.remaining();
- }
-
- ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
- try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
- call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
- }
-
- channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
- } catch (IOException e) {
- close(e);
- }
- }
-
- /**
- * Set up server authorization
- * @throws java.io.IOException if auth setup failed
- */
- private void setupAuthorization() throws IOException {
- SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
- this.useSasl = client.userProvider.isHBaseSecurityEnabled();
-
- this.token = null;
- if (useSasl && securityInfo != null) {
- AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
- if (tokenKind != null) {
- TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
- if (tokenSelector != null) {
- token = tokenSelector.selectToken(new Text(client.clusterId),
- ticket.getUGI().getTokens());
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("No token selector found for type " + tokenKind);
- }
- }
- String serverKey = securityInfo.getServerPrincipal();
- if (serverKey == null) {
- throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
- }
- this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
- address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
- if (LOG.isDebugEnabled()) {
- LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
- + serverPrincipal);
- }
- }
-
- if (!useSasl) {
- authMethod = AuthMethod.SIMPLE;
- } else if (token != null) {
- authMethod = AuthMethod.DIGEST;
- } else {
- authMethod = AuthMethod.KERBEROS;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
- }
- reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
- }
-
- /**
- * Build the user information
- * @param ugi User Group Information
- * @param authMethod Authorization method
- * @return UserInformation protobuf
- */
- private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
- if (ugi == null || authMethod == AuthMethod.DIGEST) {
- // Don't send user for token auth
- return null;
- }
- RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
- if (authMethod == AuthMethod.KERBEROS) {
- // Send effective user for Kerberos auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- } else if (authMethod == AuthMethod.SIMPLE) {
- // Send both effective user and real user for simple auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- if (ugi.getRealUser() != null) {
- userInfoPB.setRealUser(ugi.getRealUser().getUserName());
- }
- }
- return userInfoPB.build();
- }
-
- /**
- * Create connection preamble
- * @param byteBuf to write to
- * @param authMethod to write
- */
- private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
- byteBuf.writeBytes(HConstants.RPC_HEADER);
- byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
- byteBuf.writeByte(authMethod.code);
- }
-
- private void close0(Throwable e) {
- List<AsyncCall> toCleanup;
- synchronized (pendingCalls) {
- if (closed) {
- return;
- }
- closed = true;
- toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
- pendingCalls.clear();
- }
- IOException closeException = null;
- if (e != null) {
- if (e instanceof IOException) {
- closeException = (IOException) e;
- } else {
- closeException = new IOException(e);
- }
- }
- // log the info
- if (LOG.isDebugEnabled() && closeException != null) {
- LOG.debug(name + ": closing ipc connection to " + address, closeException);
- }
- if (cleanupTimer != null) {
- cleanupTimer.cancel();
- cleanupTimer = null;
- }
- for (AsyncCall call : toCleanup) {
- call.setFailed(closeException != null ? closeException
- : new ConnectionClosingException(
- "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
- }
- channel.disconnect().addListener(ChannelFutureListener.CLOSE);
- if (LOG.isDebugEnabled()) {
- LOG.debug(name + ": closed");
- }
- }
-
- /**
- * Close connection
- * @param e exception on close
- */
- public void close(final Throwable e) {
- client.removeConnection(this);
-
- // Move closing from the requesting thread to the channel thread
- if (channel.eventLoop().inEventLoop()) {
- close0(e);
- } else {
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- close0(e);
- }
- });
- }
- }
-
- /**
- * Clean up calls.
- */
- private void cleanupCalls() {
- List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
- long currentTime = EnvironmentEdgeManager.currentTime();
- long nextCleanupTaskDelay = -1L;
- synchronized (pendingCalls) {
- for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
- AsyncCall call = iter.next();
- long timeout = call.getRpcTimeout();
- if (timeout > 0) {
- if (currentTime - call.getStartTime() >= timeout) {
- iter.remove();
- toCleanup.add(call);
- } else {
- if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
- nextCleanupTaskDelay = timeout;
- }
- }
- }
- }
- if (nextCleanupTaskDelay > 0) {
- cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
- } else {
- cleanupTimer = null;
- }
- }
- for (AsyncCall call : toCleanup) {
- call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
- + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
- }
- }
-
- /**
- * Check if the connection is alive
- * @return true if alive
- */
- public boolean isAlive() {
- return channel.isOpen();
- }
-
- @Override
- public InetSocketAddress getAddress() {
- return this.address;
- }
-
- /**
- * Check if user should authenticate over Kerberos
- * @return true if should be authenticated over Kerberos
- * @throws java.io.IOException on failure of check
- */
- private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- UserGroupInformation realUser = currentUser.getRealUser();
- return authMethod == AuthMethod.KERBEROS && loginUser != null &&
- // Make sure user logged in using Kerberos either keytab or TGT
- loginUser.hasKerberosCredentials() &&
- // relogin only in case it is the login user (e.g. JT)
- // or superuser (like oozie).
- (loginUser.equals(currentUser) || loginUser.equals(realUser));
- }
-
- /**
- * If multiple clients with the same principal try to connect to the same server at the same time,
- * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
- * work around this, what is done is that the client backs off randomly and tries to initiate the
- * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
- * attempted.
- * <p>
- * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
- * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
- * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
- * underlying authentication implementation, so there is no retry from other high level (for eg,
- * HCM or HBaseAdmin).
- * </p>
- * @param currRetries retry count
- * @param ex exception describing fail
- * @param user which is trying to connect
- * @throws java.io.IOException if IO fail
- * @throws InterruptedException if thread is interrupted
- */
- private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
- final UserGroupInformation user) throws IOException, InterruptedException {
- user.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws IOException, InterruptedException {
- if (shouldAuthenticateOverKrb()) {
- if (currRetries < MAX_SASL_RETRIES) {
- LOG.debug("Exception encountered while connecting to the server : " + ex);
- // try re-login
- if (UserGroupInformation.isLoginKeytabBased()) {
- UserGroupInformation.getLoginUser().reloginFromKeytab();
- } else {
- UserGroupInformation.getLoginUser().reloginFromTicketCache();
- }
-
- // Should reconnect
- return null;
- } else {
- String msg = "Couldn't setup connection for "
- + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
- LOG.warn(msg, ex);
- throw (IOException) new IOException(msg).initCause(ex);
- }
- } else {
- LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
- }
- if (ex instanceof RemoteException) {
- throw (RemoteException) ex;
- }
- if (ex instanceof SaslException) {
- String msg = "SASL authentication failed."
- + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
- LOG.fatal(msg, ex);
- throw new RuntimeException(msg, ex);
- }
- throw new IOException(ex);
- }
- });
- }
-
- public int getConnectionHashCode() {
- return ConnectionId.hashCode(ticket, serviceName, address);
- }
-
- @Override
- public int hashCode() {
- return getConnectionHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof AsyncRpcChannelImpl) {
- AsyncRpcChannelImpl channel = (AsyncRpcChannelImpl) obj;
- return channel.hashCode() == obj.hashCode();
- }
- return false;
- }
-
- @Override
- public String toString() {
- return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
- }
-
- /**
- * Listens to call writes and fails if write failed
- */
- private static final class CallWriteListener implements ChannelFutureListener {
- private final AsyncRpcChannelImpl rpcChannel;
- private final int id;
-
- public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannelImpl, int id) {
- this.rpcChannel = asyncRpcChannelImpl;
- this.id = id;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- AsyncCall call = rpcChannel.removePendingCall(id);
- if (call != null) {
- if (future.cause() instanceof IOException) {
- call.setFailed((IOException) future.cause());
- } else {
- call.setFailed(new IOException(future.cause()));
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 723a234..3d343b4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -23,11 +23,11 @@ import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
@@ -37,6 +37,9 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -49,16 +52,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.client.ResponseFutureListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.Pair;
@@ -240,7 +240,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
- final Future<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
+ final Promise<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(),
pcrc.getPriority());
@@ -290,8 +290,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
try {
connection = createRpcChannel(md.getService().getName(), addr, ticket);
- ResponseFutureListener<Message> listener =
- new ResponseFutureListener<Message>() {
+ FutureListener<Message> listener =
+ new FutureListener<Message>() {
@Override
public void operationComplete(Future<Message> future) throws Exception {
if (!future.isSuccess()) {
@@ -351,11 +351,6 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
}
- @Override
- public EventLoop getEventExecutor() {
- return this.bootstrap.config().group().next();
- }
-
/**
* Create a cell scanner
*
@@ -378,13 +373,6 @@ public class AsyncRpcClient extends AbstractRpcClient {
return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
}
- @Override
- public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
- throws StoppedRpcClientException, FailedServerException {
- return this.createRpcChannel(serviceName,
- new InetSocketAddress(sn.getHostname(), sn.getPort()), user);
- }
-
/**
* Creates an RPC client
*
@@ -420,7 +408,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
connections.remove(hashCode);
}
if (rpcChannel == null) {
- rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, serviceName, location);
+ rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
connections.put(hashCode, rpcChannel);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index 6fcca34..7a2802f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -37,13 +37,13 @@ import org.apache.hadoop.ipc.RemoteException;
*/
@InterfaceAudience.Private
public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
- private final AsyncRpcChannelImpl channel;
+ private final AsyncRpcChannel channel;
/**
* Constructor
* @param channel on which this response handler operates
*/
- public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) {
+ public AsyncServerResponseHandler(AsyncRpcChannel channel) {
this.channel = channel;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
deleted file mode 100644
index 0d05db8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
-
-/**
- * Abstract response promise
- * @param <T> Type of result contained in Promise
- */
-@InterfaceAudience.Private
-public class Promise<T> extends DefaultPromise<T> implements Future<T> {
- /**
- * Constructor
- * @param eventLoop to handle events on
- */
- public Promise(EventExecutor eventLoop) {
- super(eventLoop);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 9d05c21..a8ec628 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcChannel;
-import io.netty.util.concurrent.EventExecutor;
+
import java.io.Closeable;
import java.io.IOException;
@@ -70,18 +70,6 @@ import org.apache.hadoop.hbase.security.User;
throws IOException;
/**
- * Create or fetch AsyncRpcChannel
- * @param serviceName to connect to
- * @param sn ServerName of the channel to create
- * @param user for the service
- * @return An async RPC channel fitting given parameters
- * @throws FailedServerException if server failed
- * @throws StoppedRpcClientException if the RPC client has stopped
- */
- AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
- throws StoppedRpcClientException, FailedServerException;
-
- /**
* Creates a "channel" that can be used by a protobuf service. Useful setting up
* protobuf stubs.
*
@@ -116,10 +104,4 @@ import org.apache.hadoop.hbase.security.User;
* supports cell blocks.
*/
boolean hasCellBlockSupport();
-
- /**
- * Get an event loop to operate on
- * @return EventLoop
- */
- EventExecutor getEventExecutor();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index dc05af1..37b9afd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -23,8 +23,6 @@ import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-import io.netty.util.concurrent.EventExecutor;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -55,6 +53,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import javax.net.SocketFactory;
import javax.security.sasl.SaslException;
@@ -66,7 +65,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
@@ -1219,11 +1217,6 @@ public class RpcClientImpl extends AbstractRpcClient {
}
}
- @Override
- public EventExecutor getEventExecutor() {
- return AsyncRpcClient.getGlobalEventLoopGroup(this.conf).getFirst().next();
- }
-
/**
* Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
@@ -1336,14 +1329,8 @@ public class RpcClientImpl extends AbstractRpcClient {
}
@Override
- public org.apache.hadoop.hbase.ipc.AsyncRpcChannel createRpcChannel(String serviceName,
- ServerName sn, User user) throws StoppedRpcClientException, FailedServerException {
- return new AsyncRpcChannel(sn, user);
- }
-
- @Override
public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) {
- return new RpcChannelImplementation(sn, user, rpcTimeout);
+ throw new UnsupportedOperationException();
}
/**
@@ -1392,143 +1379,4 @@ public class RpcClientImpl extends AbstractRpcClient {
return connection;
}
-
- /**
- * Simulated async call
- */
- private class RpcChannelImplementation implements RpcChannel {
- private final InetSocketAddress isa;
- private final User ticket;
- private final int channelOperationTimeout;
- private final EventExecutor executor;
-
- /**
- * @param channelOperationTimeout - the default timeout when no timeout is given
- */
- protected RpcChannelImplementation(
- final ServerName sn, final User ticket, int channelOperationTimeout) {
- this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
- this.ticket = ticket;
- this.channelOperationTimeout = channelOperationTimeout;
-
- this.executor = RpcClientImpl.this.getEventExecutor();
- }
-
- @Override
- public void callMethod(final MethodDescriptor method, RpcController controller,
- final Message request, final Message responsePrototype, final RpcCallback<Message> done) {
- final PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController(
- controller,
- channelOperationTimeout);
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
- cs.setStartTime(EnvironmentEdgeManager.currentTime());
- Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
- cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
- if (metrics != null) {
- metrics.updateRpc(method, request, cs);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
- }
-
- done.run(call.response);
- } catch (IOException e) {
- pcrc.setFailed(e);
- } catch (InterruptedException e) {
- pcrc.startCancel();
- }
- }
- });
- }
- }
-
- /**
- * Wraps the call in an async channel.
- */
- private class AsyncRpcChannel implements org.apache.hadoop.hbase.ipc.AsyncRpcChannel {
- private final EventExecutor executor;
- private final InetSocketAddress isa;
-
- private final User ticket;
-
- /**
- * Constructor
- * @param sn servername to connect to
- * @param user to connect with
- */
- public AsyncRpcChannel(ServerName sn, User user) {
- this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
- this.executor = RpcClientImpl.this.getEventExecutor();
- this.ticket = user;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <R extends Message, O> Future<O> callMethod(final MethodDescriptor method,
- final Message request, CellScanner cellScanner, final R responsePrototype,
- final MessageConverter<R, O> messageConverter,
- final IOExceptionConverter exceptionConverter, long rpcTimeout, int priority) {
- final PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(cellScanner);
- pcrc.setPriority(priority);
- pcrc.setCallTimeout((int) rpcTimeout);
-
- final Promise<O> promise = new Promise<>(executor);
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
- cs.setStartTime(EnvironmentEdgeManager.currentTime());
- Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
- cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
- if (metrics != null) {
- metrics.updateRpc(method, request, cs);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
- }
-
- promise.setSuccess(
- messageConverter.convert((R) call.response, call.cells)
- );
- } catch (InterruptedException e) {
- promise.cancel(true);
- } catch (IOException e) {
- if(exceptionConverter != null) {
- e = exceptionConverter.convert(e);
- }
- promise.setFailure(e);
- }
- }
- });
-
- return promise;
- }
-
- @Override
- public EventExecutor getEventExecutor() {
- return this.executor;
- }
-
- @Override
- public void close(Throwable cause) {
- this.executor.shutdownGracefully();
- }
-
- @Override
- public boolean isAlive() {
- return !this.executor.isShuttingDown() && !this.executor.isShutdown();
- }
-
- @Override
- public InetSocketAddress getAddress() {
- return isa;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 45cec78..4cfa25c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
@@ -32,9 +31,9 @@ import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@@ -42,9 +41,7 @@ import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -56,8 +53,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
@@ -414,141 +409,4 @@ public abstract class AbstractTestIPC {
.wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
.getCause() instanceof CallTimeoutException);
}
-
- @Test
- public void testAsyncProtobufConnectionSetup() throws Exception {
- TestRpcServer rpcServer = new TestRpcServer();
- try (RpcClient client = createRpcClient(CONF)) {
- rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
- if (address == null) {
- throw new IOException("Listener channel is closed");
- }
- MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
- EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
- RpcChannel channel = client.createProtobufRpcChannel(
- ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
- User.getCurrent(), 0);
-
- final AtomicBoolean done = new AtomicBoolean(false);
-
- channel
- .callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType().toProto(),
- new com.google.protobuf.RpcCallback<Message>() {
- @Override
- public void run(Message parameter) {
- done.set(true);
- }
- });
-
- TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
- @Override
- public boolean evaluate() throws Exception {
- return done.get();
- }
- });
- } finally {
- rpcServer.stop();
- }
- }
-
- @Test
- public void testRTEDuringAsyncProtobufConnectionSetup() throws Exception {
- TestRpcServer rpcServer = new TestRpcServer();
- try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
- rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
- if (address == null) {
- throw new IOException("Listener channel is closed");
- }
- MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
- EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
- RpcChannel channel = client.createProtobufRpcChannel(
- ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
- User.getCurrent(), 0);
-
- final AtomicBoolean done = new AtomicBoolean(false);
-
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
- controller.notifyOnFail(new com.google.protobuf.RpcCallback<IOException>() {
- @Override
- public void run(IOException e) {
- done.set(true);
- LOG.info("Caught expected exception: " + e.toString());
- assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
- }
- });
-
- channel.callMethod(md, controller, param, md.getOutputType().toProto(),
- new com.google.protobuf.RpcCallback<Message>() {
- @Override
- public void run(Message parameter) {
- done.set(true);
- fail("Expected an exception to have been thrown!");
- }
- });
-
- TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
- @Override
- public boolean evaluate() throws Exception {
- return done.get();
- }
- });
- } finally {
- rpcServer.stop();
- }
- }
-
- @Test
- public void testAsyncConnectionSetup() throws Exception {
- TestRpcServer rpcServer = new TestRpcServer();
- try (RpcClient client = createRpcClient(CONF)) {
- rpcServer.start();
- Message msg = setupAsyncConnection(rpcServer, client);
-
- assertNotNull(msg);
- } finally {
- rpcServer.stop();
- }
- }
-
- @Test
- public void testRTEDuringAsyncConnectionSetup() throws Exception {
- TestRpcServer rpcServer = new TestRpcServer();
- try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
- rpcServer.start();
- setupAsyncConnection(rpcServer, client);
-
- fail("Expected an exception to have been thrown!");
- } catch (ExecutionException e) {
- assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
- } finally {
- rpcServer.stop();
- }
- }
-
- private Message setupAsyncConnection(TestRpcServer rpcServer, RpcClient client)
- throws IOException, InterruptedException, ExecutionException,
- java.util.concurrent.TimeoutException {
- InetSocketAddress address = rpcServer.getListenerAddress();
- if (address == null) {
- throw new IOException("Listener channel is closed");
- }
- MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
- EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
- ServerName serverName =
- ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis());
-
- AsyncRpcChannel channel =
- client.createRpcChannel(md.getService().getName(), serverName, User.getCurrent());
-
- final Future<Message> f = channel
- .callMethod(md, param, null, md.getOutputType().toProto(), MessageConverter.NO_CONVERTER,
- null, 1000, HConstants.NORMAL_QOS);
-
- return f.get(1, TimeUnit.SECONDS);
- }
}