You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/03 09:28:50 UTC
[1/2] hbase git commit: HBASE-17263 Netty based rpc server impl
Repository: hbase
Updated Branches:
refs/heads/master de78c1189 -> 4dbd025cc
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index c409f6e..f771eec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hbase.ipc;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.BindException;
@@ -32,7 +29,6 @@ import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
@@ -40,8 +36,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.nio.channels.WritableByteChannel;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -60,9 +54,6 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -73,42 +64,26 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
@@ -165,8 +140,9 @@ public class SimpleRpcServer extends RpcServer {
justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md,
RequestHeader header, Message param, CellScanner cellScanner,
- Connection connection, Responder responder, long size, TraceInfo tinfo,
- final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
+ RpcServer.Connection connection, long size, TraceInfo tinfo,
+ final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup,
+ Responder responder) {
super(id, service, md, header, param, cellScanner, connection, size,
tinfo, remoteAddress, timeout, reqCleanup);
this.responder = responder;
@@ -178,6 +154,7 @@ public class SimpleRpcServer extends RpcServer {
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="Presume the lock on processing request held by caller is protection enough")
+ @Override
void done() {
super.done();
this.getConnection().decRpcCount(); // Say that we're done with this call.
@@ -192,6 +169,7 @@ public class SimpleRpcServer extends RpcServer {
}
}
+ @Override
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
@@ -769,19 +747,6 @@ public class SimpleRpcServer extends RpcServer {
private long lastContact;
protected Socket socket;
- private ByteBuffer unwrappedData;
- // When is this set? FindBugs wants to know! Says NP
- private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
-
- private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
- null, null, this, null, 0, null, null, 0, null);
-
- private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
- 0, null, null, 0, null);
-
- private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID,
- null, null, null, null, null, this, null, 0, null, null, 0, null);
-
public Connection(SocketChannel channel, long lastContact) {
super();
this.channel = channel;
@@ -804,6 +769,13 @@ public class SimpleRpcServer extends RpcServer {
socketSendBufferSize);
}
}
+ this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
+ 0, null, null, 0, null, responder);
+ this.setConnectionHeaderResponseCall = new Call(
+ CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
+ this, 0, null, null, 0, null, responder);
+ this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
+ null, null, null, this, 0, null, null, 0, null, responder);
}
public void setLastContact(long lastContact) {
@@ -829,187 +801,6 @@ public class SimpleRpcServer extends RpcServer {
rpcCount.increment();
}
- private void saslReadAndProcess(ByteBuff saslToken) throws IOException,
- InterruptedException {
- if (saslContextEstablished) {
- if (LOG.isTraceEnabled())
- LOG.trace("Have read input token of size " + saslToken.limit()
- + " for processing by saslServer.unwrap()");
-
- if (!useWrap) {
- processOneRpc(saslToken);
- } else {
- byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
- byte [] plaintextData;
- if (useCryptoAesWrap) {
- // unwrap with CryptoAES
- plaintextData = cryptoAES.unwrap(b, 0, b.length);
- } else {
- plaintextData = saslServer.unwrap(b, 0, b.length);
- }
- processUnwrappedData(plaintextData);
- }
- } else {
- byte[] replyToken;
- try {
- if (saslServer == null) {
- switch (authMethod) {
- case DIGEST:
- if (secretManager == null) {
- throw new AccessDeniedException(
- "Server is not configured to do DIGEST authentication.");
- }
- saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
- .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
- HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
- secretManager, this));
- break;
- default:
- UserGroupInformation current = UserGroupInformation.getCurrentUser();
- String fullName = current.getUserName();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Kerberos principal name is " + fullName);
- }
- final String names[] = SaslUtil.splitKerberosName(fullName);
- if (names.length != 3) {
- throw new AccessDeniedException(
- "Kerberos principal name does NOT have the expected "
- + "hostname part: " + fullName);
- }
- current.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws SaslException {
- saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
- .getMechanismName(), names[0], names[1],
- HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
- return null;
- }
- });
- }
- if (saslServer == null)
- throw new AccessDeniedException(
- "Unable to find SASL server implementation for "
- + authMethod.getMechanismName());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Have read input token of size " + saslToken.limit()
- + " for processing by saslServer.evaluateResponse()");
- }
- replyToken = saslServer
- .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
- } catch (IOException e) {
- IOException sendToClient = e;
- Throwable cause = e;
- while (cause != null) {
- if (cause instanceof InvalidToken) {
- sendToClient = (InvalidToken) cause;
- break;
- }
- cause = cause.getCause();
- }
- doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
- sendToClient.getLocalizedMessage());
- metrics.authenticationFailure();
- String clientIP = this.toString();
- // attempting user could be null
- AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
- throw e;
- }
- if (replyToken != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Will send token of size " + replyToken.length
- + " from saslServer.");
- }
- doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
- null);
- }
- if (saslServer.isComplete()) {
- String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
- ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
- if (LOG.isDebugEnabled()) {
- LOG.debug("SASL server context established. Authenticated client: "
- + ugi + ". Negotiated QoP is "
- + saslServer.getNegotiatedProperty(Sasl.QOP));
- }
- metrics.authenticationSuccess();
- AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
- saslContextEstablished = true;
- }
- }
- }
-
- /**
- * No protobuf encoding of raw sasl messages
- */
- private void doRawSaslReply(SaslStatus status, Writable rv,
- String errorClass, String error) throws IOException {
- ByteBufferOutputStream saslResponse = null;
- DataOutputStream out = null;
- try {
- // In my testing, have noticed that sasl messages are usually
- // in the ballpark of 100-200. That's why the initial capacity is 256.
- saslResponse = new ByteBufferOutputStream(256);
- out = new DataOutputStream(saslResponse);
- out.writeInt(status.state); // write status
- if (status == SaslStatus.SUCCESS) {
- rv.write(out);
- } else {
- WritableUtils.writeString(out, errorClass);
- WritableUtils.writeString(out, error);
- }
- saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
- saslCall.responder = responder;
- saslCall.sendResponseIfReady();
- } finally {
- if (saslResponse != null) {
- saslResponse.close();
- }
- if (out != null) {
- out.close();
- }
- }
- }
-
- /**
- * Send the response for connection header
- */
- private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData) throws IOException {
- ByteBufferOutputStream response = null;
- DataOutputStream out = null;
- try {
- response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
- out = new DataOutputStream(response);
- out.writeInt(wrappedCipherMetaData.length);
- out.write(wrappedCipherMetaData);
-
- setConnectionHeaderResponseCall.setConnectionHeaderResponse(response.getByteBuffer());
- setConnectionHeaderResponseCall.responder = responder;
- setConnectionHeaderResponseCall.sendResponseIfReady();
- } finally {
- if (out != null) {
- out.close();
- }
- if (response != null) {
- response.close();
- }
- }
- }
-
- private void disposeSasl() {
- if (saslServer != null) {
- try {
- saslServer.dispose();
- saslServer = null;
- } catch (SaslException ignored) {
- // Ignored. This is being disposed of anyway.
- }
- }
- }
-
private int readPreamble() throws IOException {
int count;
// Check for 'HBas' magic.
@@ -1044,7 +835,7 @@ public class SimpleRpcServer extends RpcServer {
} else {
AccessDeniedException ae = new AccessDeniedException("Authentication is required");
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
- responder.doRespond(authFailedCall);
+ authFailedCall.sendResponseIfReady();
throw ae;
}
}
@@ -1150,8 +941,8 @@ public class SimpleRpcServer extends RpcServer {
RequestHeader header = (RequestHeader) builder.build();
// Notify the client about the offending request
- Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
- null, this, responder, 0, null, this.addr, 0, null);
+ Call reqTooBig = new Call(header.getCallId(), this.service, null,
+ null, null, null, this, 0, null, this.addr, 0, null, responder);
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
@@ -1252,305 +1043,16 @@ public class SimpleRpcServer extends RpcServer {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
- Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null, 0,
- null);
+ Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
+ null, null, 0, null, responder);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
return -1;
}
- // Reads the connection header following version
- private void processConnectionHeader(ByteBuff buf) throws IOException {
- if (buf.hasArray()) {
- this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
- } else {
- CodedInputStream cis = UnsafeByteOperations
- .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
- cis.enableAliasing(true);
- this.connectionHeader = ConnectionHeader.parseFrom(cis);
- }
- String serviceName = connectionHeader.getServiceName();
- if (serviceName == null) throw new EmptyServiceNameException();
- this.service = getService(services, serviceName);
- if (this.service == null) {
- throw new UnknownServiceException(serviceName);
- }
- setupCellBlockCodecs(this.connectionHeader);
- RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
- RPCProtos.ConnectionHeaderResponse.newBuilder();
- setupCryptoCipher(this.connectionHeader, chrBuilder);
- responseConnectionHeader(chrBuilder);
- UserGroupInformation protocolUser = createUser(connectionHeader);
- if (!useSasl) {
- ugi = protocolUser;
- if (ugi != null) {
- ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
- }
- // audit logging for SASL authenticated users happens in saslReadAndProcess()
- if (authenticatedWithFallback) {
- LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
- + " connecting from " + getHostAddress());
- }
- AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
- } else {
- // user is authenticated
- ugi.setAuthenticationMethod(authMethod.authenticationMethod);
- //Now we check if this is a proxy user case. If the protocol user is
- //different from the 'user', it is a proxy user scenario. However,
- //this is not allowed if user authenticated with DIGEST.
- if ((protocolUser != null)
- && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
- if (authMethod == AuthMethod.DIGEST) {
- // Not allowed to doAs if token authentication is used
- throw new AccessDeniedException("Authenticated user (" + ugi
- + ") doesn't match what the client claims to be ("
- + protocolUser + ")");
- } else {
- // Effective user can be different from authenticated user
- // for simple auth or kerberos auth
- // The user is the real user. Now we create a proxy user
- UserGroupInformation realUser = ugi;
- ugi = UserGroupInformation.createProxyUser(protocolUser
- .getUserName(), realUser);
- // Now the user is a proxy user, set Authentication method Proxy.
- ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
- }
- }
- }
- if (connectionHeader.hasVersionInfo()) {
- // see if this connection will support RetryImmediatelyException
- retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
-
- AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
- + " with version info: "
- + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
- } else {
- AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
- + " with unknown version info");
- }
- }
-
- private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
- throws FatalConnectionException {
- // Response the connection header if Crypto AES is enabled
- if (!chrBuilder.hasCryptoCipherMeta()) return;
- try {
- byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
- // encrypt the Crypto AES cipher meta data with sasl server, and send to client
- byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
- Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
- Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
-
- doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
- } catch (IOException ex) {
- throw new UnsupportedCryptoException(ex.getMessage(), ex);
- }
- }
-
- private void processUnwrappedData(byte[] inBuf) throws IOException,
- InterruptedException {
- ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
- // Read all RPCs contained in the inBuf, even partial ones
- while (true) {
- int count;
- if (unwrappedDataLengthBuffer.remaining() > 0) {
- count = channelRead(ch, unwrappedDataLengthBuffer);
- if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
- return;
- }
-
- if (unwrappedData == null) {
- unwrappedDataLengthBuffer.flip();
- int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
- if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
- if (LOG.isDebugEnabled())
- LOG.debug("Received ping message");
- unwrappedDataLengthBuffer.clear();
- continue; // ping message
- }
- unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
- }
-
- count = channelRead(ch, unwrappedData);
- if (count <= 0 || unwrappedData.remaining() > 0)
- return;
-
- if (unwrappedData.remaining() == 0) {
- unwrappedDataLengthBuffer.clear();
- unwrappedData.flip();
- processOneRpc(new SingleByteBuff(unwrappedData));
- unwrappedData = null;
- }
- }
- }
-
- private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
- if (connectionHeaderRead) {
- processRequest(buf);
- } else {
- processConnectionHeader(buf);
- this.connectionHeaderRead = true;
- if (!authorizeConnection()) {
- // Throw FatalConnectionException wrapping ACE so client does right thing and closes
- // down the connection instead of trying to read non-existent retun.
- throw new AccessDeniedException("Connection from " + this + " for service " +
- connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
- }
- this.user = userProvider.create(this.ugi);
- }
- }
-
- /**
- * @param buf Has the request header and the request param and optionally encoded data buffer
- * all in this one array.
- * @throws IOException
- * @throws InterruptedException
- */
- protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
- long totalRequestSize = buf.limit();
- int offset = 0;
- // Here we read in the header. We avoid having pb
- // do its default 4k allocation for CodedInputStream. We force it to use backing array.
- CodedInputStream cis;
- if (buf.hasArray()) {
- cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
- } else {
- cis = UnsafeByteOperations
- .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
- }
- cis.enableAliasing(true);
- int headerSize = cis.readRawVarint32();
- offset = cis.getTotalBytesRead();
- Message.Builder builder = RequestHeader.newBuilder();
- ProtobufUtil.mergeFrom(builder, cis, headerSize);
- RequestHeader header = (RequestHeader) builder.build();
- offset += headerSize;
- int id = header.getCallId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
- " totalRequestSize: " + totalRequestSize + " bytes");
- }
- // Enforcing the call queue size, this triggers a retry in the client
- // This is a bit late to be doing this check - we have already read in the total request.
- if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
- final Call callTooBig =
- new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0, this.callCleanup);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
- setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + server.getServerName() +
- ", is hbase.ipc.server.max.callqueue.size too small?");
- responder.doRespond(callTooBig);
- return;
- }
- MethodDescriptor md = null;
- Message param = null;
- CellScanner cellScanner = null;
- try {
- if (header.hasRequestParam() && header.getRequestParam()) {
- md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
- if (md == null) throw new UnsupportedOperationException(header.getMethodName());
- builder = this.service.getRequestPrototype(md).newBuilderForType();
- cis.resetSizeCounter();
- int paramSize = cis.readRawVarint32();
- offset += cis.getTotalBytesRead();
- if (builder != null) {
- ProtobufUtil.mergeFrom(builder, cis, paramSize);
- param = builder.build();
- }
- offset += paramSize;
- } else {
- // currently header must have request param, so we directly throw exception here
- String msg = "Invalid request header: " + TextFormat.shortDebugString(header)
- + ", should have param set in it";
- LOG.warn(msg);
- throw new DoNotRetryIOException(msg);
- }
- if (header.hasCellBlockMeta()) {
- buf.position(offset);
- ByteBuff dup = buf.duplicate();
- dup.limit(offset + header.getCellBlockMeta().getLength());
- cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
- this.compressionCodec, dup);
- }
- } catch (Throwable t) {
- InetSocketAddress address = getListenerAddress();
- String msg = (address != null ? address : "(channel closed)") +
- " is unable to read call parameter from client " + getHostAddress();
- LOG.warn(msg, t);
-
- metrics.exception(t);
-
- // probably the hbase hadoop version does not match the running hadoop version
- if (t instanceof LinkageError) {
- t = new DoNotRetryIOException(t);
- }
- // If the method is not present on the server, do not retry.
- if (t instanceof UnsupportedOperationException) {
- t = new DoNotRetryIOException(t);
- }
-
- final Call readParamsFailedCall =
- new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0, this.callCleanup);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, readParamsFailedCall, t,
- msg + "; " + t.getMessage());
- responder.doRespond(readParamsFailedCall);
- return;
- }
-
- TraceInfo traceInfo = header.hasTraceInfo()
- ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
- : null;
- int timeout = 0;
- if (header.hasTimeout() && header.getTimeout() > 0){
- timeout = Math.max(minClientRequestTimeout, header.getTimeout());
- }
- Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
- totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup);
-
- if (!scheduler.dispatch(new CallRunner(SimpleRpcServer.this, call))) {
- callQueueSizeInBytes.add(-1 * call.getSize());
-
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
- setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + server.getServerName() +
- ", too many items queued ?");
- responder.doRespond(call);
- }
- }
-
- private boolean authorizeConnection() throws IOException {
- try {
- // If auth method is DIGEST, the token was obtained by the
- // real user for the effective user, therefore not required to
- // authorize real user. doAs is allowed only for simple or kerberos
- // authentication
- if (ugi != null && ugi.getRealUser() != null
- && (authMethod != AuthMethod.DIGEST)) {
- ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
- }
- authorize(ugi, connectionHeader, getHostInetAddress());
- metrics.authorizationSuccess();
- } catch (AuthorizationException ae) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
- }
- metrics.authorizationFailure();
- setupResponse(authFailedResponse, authFailedCall,
- new AccessDeniedException(ae), ae.getMessage());
- responder.doRespond(authFailedCall);
- return false;
- }
- return true;
- }
-
- protected synchronized void close() {
+ @Override
+ public synchronized void close() {
disposeSasl();
data = null;
callCleanup = null;
@@ -1577,6 +1079,16 @@ public class SimpleRpcServer extends RpcServer {
public boolean isConnectionOpen() {
return channel.isOpen();
}
+
+ @Override
+ public RpcServer.Call createCall(int id, final BlockingService service,
+ final MethodDescriptor md, RequestHeader header, Message param,
+ CellScanner cellScanner, RpcServer.Connection connection, long size,
+ TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+ CallCleanup reqCleanup) {
+ return new Call(id, service, md, header, param, cellScanner, connection,
+ size, tinfo, remoteAddress, timeout, reqCleanup, responder);
+ }
}
@@ -1621,20 +1133,6 @@ public class SimpleRpcServer extends RpcServer {
return new Connection(channel, time);
}
- /**
- * Setup response for the RPC Call.
- *
- * @param response buffer to serialize the response into
- * @param call {@link Call} to which we are setting up the response
- * @param error error message, if the call failed
- * @throws IOException
- */
- private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
- throws IOException {
- if (response != null) response.reset();
- call.setResponse(null, null, t, error);
- }
-
protected void closeConnection(Connection connection) {
connectionManager.close(connection);
}
@@ -1704,30 +1202,25 @@ public class SimpleRpcServer extends RpcServer {
return listener.getAddress();
}
+ @Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
}
- public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
- CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
- int timeout)
+ @Override
+ public Pair<Message, CellScanner> call(BlockingService service,
+ MethodDescriptor md, Message param, CellScanner cellScanner,
+ long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
throws IOException {
- Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, null, -1, null, null, timeout,
- null);
+ Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
+ -1, null, null, timeout, null, null);
fakeCall.setReceiveTime(receiveTime);
return call(fakeCall, status);
}
/**
- * When the read or write buffer size is larger than this limit, i/o will be
- * done in chunks of this size. Most RPC requests and responses would be
- * be smaller.
- */
- private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
-
- /**
* This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
* If the amount of data is large, it writes to channel in smaller chunks.
* This is to avoid jdk from creating many direct buffers as the size of
@@ -1749,70 +1242,6 @@ public class SimpleRpcServer extends RpcServer {
}
/**
- * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
- * If the amount of data is large, it writes to channel in smaller chunks.
- * This is to avoid jdk from creating many direct buffers as the size of
- * ByteBuffer increases. There should not be any performance degredation.
- *
- * @param channel writable byte channel to write on
- * @param buffer buffer to write
- * @return number of bytes written
- * @throws java.io.IOException e
- * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
- */
- protected int channelRead(ReadableByteChannel channel,
- ByteBuffer buffer) throws IOException {
-
- int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
- channel.read(buffer) : channelIO(channel, null, buffer);
- if (count > 0) {
- metrics.receivedBytes(count);
- }
- return count;
- }
-
- /**
- * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
- * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
- * one of readCh or writeCh should be non-null.
- *
- * @param readCh read channel
- * @param writeCh write channel
- * @param buf buffer to read or write into/out of
- * @return bytes written
- * @throws java.io.IOException e
- * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
- * @see #channelWrite(GatheringByteChannel, BufferChain)
- */
- protected static int channelIO(ReadableByteChannel readCh,
- WritableByteChannel writeCh,
- ByteBuffer buf) throws IOException {
-
- int originalLimit = buf.limit();
- int initialRemaining = buf.remaining();
- int ret = 0;
-
- while (buf.remaining() > 0) {
- try {
- int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
- buf.limit(buf.position() + ioSize);
-
- ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
-
- if (ret < ioSize) {
- break;
- }
-
- } finally {
- buf.limit(originalLimit);
- }
- }
-
- int nBytes = initialRemaining - buf.remaining();
- return (nBytes > 0) ? nBytes : ret;
- }
-
- /**
* A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
* @param socket the socket to bind
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index a1a73c1..581e50e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -321,7 +321,7 @@ public abstract class AbstractTestIPC {
}
@Override
- protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
+ public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
new file mode 100644
index 0000000..81be74d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestNettyRpcServer {
+ @Rule
+ public TestName name = new TestName();
+ private static HBaseTestingUtility TEST_UTIL;
+
+ private static TableName TABLE;
+ private static byte[] FAMILY = Bytes.toBytes("f1");
+ private static byte[] PRIVATE_COL = Bytes.toBytes("private");
+ private static byte[] PUBLIC_COL = Bytes.toBytes("public");
+
+ @Before
+ public void setup() {
+ TABLE = TableName.valueOf(name.getMethodName());
+ }
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().set(
+ RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
+ NettyRpcServer.class.getName());
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout = 180000)
+ public void testNettyRpcServer() throws Exception {
+ final Table table = TEST_UTIL.createTable(TABLE, FAMILY);
+ try {
+ // put some test data
+ List<Put> puts = new ArrayList<Put>(100);
+ for (int i = 0; i < 100; i++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.addColumn(FAMILY, PRIVATE_COL, Bytes.toBytes("secret " + i));
+ p.addColumn(FAMILY, PUBLIC_COL, Bytes.toBytes("info " + i));
+ puts.add(p);
+ }
+ table.put(puts);
+
+ // read to verify it.
+ Scan scan = new Scan();
+ scan.setCaching(16);
+ ResultScanner rs = table.getScanner(scan);
+ int rowcnt = 0;
+ for (Result r : rs) {
+ rowcnt++;
+ int rownum = Bytes.toInt(r.getRow());
+ assertTrue(r.containsColumn(FAMILY, PRIVATE_COL));
+ assertEquals("secret " + rownum,
+ Bytes.toString(r.getValue(FAMILY, PRIVATE_COL)));
+ assertTrue(r.containsColumn(FAMILY, PUBLIC_COL));
+ assertEquals("info " + rownum,
+ Bytes.toString(r.getValue(FAMILY, PUBLIC_COL)));
+ }
+ assertEquals("Expected 100 rows returned", 100, rowcnt);
+ } finally {
+ table.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index b039003..f21359c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -40,6 +42,10 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
@@ -48,6 +54,7 @@ import com.google.common.collect.Lists;
* of types in <code>src/test/protobuf/test.proto</code> and protobuf service definition from
* <code>src/test/protobuf/test_rpc_service.proto</code>
*/
+@RunWith(Parameterized.class)
@Category({ RPCTests.class, MediumTests.class })
public class TestProtoBufRpc {
public final static String ADDRESS = "localhost";
@@ -56,9 +63,20 @@ public class TestProtoBufRpc {
private Configuration conf;
private RpcServerInterface server;
+ @Parameters(name = "{index}: rpcServerImpl={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
+ new Object[] { NettyRpcServer.class.getName() });
+ }
+
+ @Parameter(0)
+ public String rpcServerImpl;
+
@Before
public void setUp() throws IOException { // Setup server for both protocols
this.conf = HBaseConfiguration.create();
+ this.conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
+ rpcServerImpl);
Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
log.setLevel(Level.DEBUG);
log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace");
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 449899f..c12331f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -23,11 +23,14 @@ import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.testclassification.RPCTests;
@@ -35,11 +38,14 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-
+@RunWith(Parameterized.class)
@Category({ RPCTests.class, SmallTests.class })
public class TestRpcHandlerException {
@@ -64,6 +70,15 @@ public class TestRpcHandlerException {
}
}
+ @Parameters(name = "{index}: rpcServerImpl={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
+ new Object[] { NettyRpcServer.class.getName() });
+ }
+
+ @Parameter(0)
+ public String rpcServerImpl;
+
/*
* This is a unit test to make sure to abort region server when the number of Rpc handler thread
* caught errors exceeds the threshold. Client will hang when RS aborts.
@@ -73,6 +88,7 @@ public class TestRpcHandlerException {
public void testRpcScheduler() throws IOException, InterruptedException {
PriorityFunction qosFunction = mock(PriorityFunction.class);
Abortable abortable = new AbortServer();
+ CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0);
RpcServer rpcServer = RpcServerFactory.createRpcServer(null, "testRpcServer",
Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)),
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
index c848250..85a14f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import javax.security.sasl.SaslException;
@@ -46,11 +47,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
@@ -72,7 +76,6 @@ import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
@RunWith(Parameterized.class)
@Category({ SecurityTests.class, SmallTests.class })
@@ -96,15 +99,27 @@ public class TestSecureIPC {
@Rule
public ExpectedException exception = ExpectedException.none();
- @Parameters(name = "{index}: rpcClientImpl={0}")
+ @Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
public static Collection<Object[]> parameters() {
- return Arrays.asList(new Object[]{BlockingRpcClient.class.getName()},
- new Object[]{NettyRpcClient.class.getName()});
+ List<Object[]> params = new ArrayList<>();
+ List<String> rpcClientImpls = Arrays.asList(
+ BlockingRpcClient.class.getName(), NettyRpcClient.class.getName());
+ List<String> rpcServerImpls = Arrays.asList(
+ SimpleRpcServer.class.getName(), NettyRpcServer.class.getName());
+ for (String rpcClientImpl : rpcClientImpls) {
+ for (String rpcServerImpl : rpcServerImpls) {
+ params.add(new Object[] { rpcClientImpl, rpcServerImpl });
+ }
+ }
+ return params;
}
- @Parameter
+ @Parameter(0)
public String rpcClientImpl;
+ @Parameter(1)
+ public String rpcServerImpl;
+
@BeforeClass
public static void setUp() throws Exception {
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
@@ -129,6 +144,8 @@ public class TestSecureIPC {
clientConf = getSecuredConfiguration();
clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
serverConf = getSecuredConfiguration();
+ serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
+ rpcServerImpl);
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index 0324359..3ec0495 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -49,11 +51,13 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -80,10 +84,14 @@ import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
@@ -97,6 +105,7 @@ import com.google.protobuf.ServiceException;
// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps
// working.
+@RunWith(Parameterized.class)
@Category({SecurityTests.class, MediumTests.class})
public class TestTokenAuthentication {
static {
@@ -116,6 +125,7 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
private static final Log LOG = LogFactory.getLog(TokenServer.class);
private Configuration conf;
+ private HBaseTestingUtility TEST_UTIL;
private RpcServerInterface rpcServer;
private InetSocketAddress isa;
private ZooKeeperWatcher zookeeper;
@@ -125,8 +135,10 @@ public class TestTokenAuthentication {
private boolean stopped = false;
private long startcode;
- public TokenServer(Configuration conf) throws IOException {
+ public TokenServer(Configuration conf, HBaseTestingUtility TEST_UTIL)
+ throws IOException {
this.conf = conf;
+ this.TEST_UTIL = TEST_UTIL;
this.startcode = EnvironmentEdgeManager.currentTime();
// Server to handle client requests.
String hostname =
@@ -391,14 +403,23 @@ public class TestTokenAuthentication {
}
}
- private static HBaseTestingUtility TEST_UTIL;
- private static TokenServer server;
- private static Thread serverThread;
- private static AuthenticationTokenSecretManager secretManager;
- private static ClusterId clusterId = new ClusterId();
+ @Parameters(name = "{index}: rpcServerImpl={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
+ new Object[] { NettyRpcServer.class.getName() });
+ }
+
+ @Parameter(0)
+ public String rpcServerImpl;
+
+ private HBaseTestingUtility TEST_UTIL;
+ private TokenServer server;
+ private Thread serverThread;
+ private AuthenticationTokenSecretManager secretManager;
+ private ClusterId clusterId = new ClusterId();
- @BeforeClass
- public static void setupBeforeClass() throws Exception {
+ @Before
+ public void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
TEST_UTIL.startMiniZKCluster();
// register token type for protocol
@@ -410,7 +431,8 @@ public class TestTokenAuthentication {
conf.set("hadoop.security.authentication", "kerberos");
conf.set("hbase.security.authentication", "kerberos");
conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
- server = new TokenServer(conf);
+ conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
+ server = new TokenServer(conf, TEST_UTIL);
serverThread = new Thread(server);
Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
// wait for startup
@@ -432,8 +454,8 @@ public class TestTokenAuthentication {
}
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
+ @After
+ public void tearDown() throws Exception {
server.stop("Test complete");
Threads.shutdown(serverThread);
TEST_UTIL.shutdownMiniZKCluster();
[2/2] hbase git commit: HBASE-17263 Netty based rpc server impl
Posted by zh...@apache.org.
HBASE-17263 Netty based rpc server impl
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4dbd025c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4dbd025c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4dbd025c
Branch: refs/heads/master
Commit: 4dbd025ccf0b6962a863b53c9dbf1c2c42e679d4
Parents: de78c11
Author: binlijin <bi...@gmail.com>
Authored: Wed May 3 17:23:26 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed May 3 17:23:26 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/ipc/BufferChain.java | 10 +
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 540 ++++++++++++++++
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 614 +++++++++++++++++-
.../hadoop/hbase/ipc/SimpleRpcServer.java | 643 ++-----------------
.../hadoop/hbase/ipc/AbstractTestIPC.java | 2 +-
.../hadoop/hbase/ipc/TestNettyRpcServer.java | 109 ++++
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 18 +
.../hbase/ipc/TestRpcHandlerException.java | 20 +-
.../hadoop/hbase/security/TestSecureIPC.java | 29 +-
.../security/token/TestTokenAuthentication.java | 48 +-
10 files changed, 1396 insertions(+), 637 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index 26bc56c..bd0515a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -32,11 +32,13 @@ class BufferChain {
private final ByteBuffer[] buffers;
private int remaining = 0;
private int bufferOffset = 0;
+ private int size;
BufferChain(ByteBuffer[] buffers) {
for (ByteBuffer b : buffers) {
this.remaining += b.remaining();
}
+ this.size = remaining;
this.buffers = buffers;
}
@@ -108,4 +110,12 @@ class BufferChain {
}
}
}
+
+ int size() {
+ return size;
+ }
+
+ ByteBuffer[] getBuffers() {
+ return this.buffers;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
new file mode 100644
index 0000000..be55378
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.HBasePolicyProvider;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVM;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * An RPC server with Netty4 implementation.
+ *
+ */
+public class NettyRpcServer extends RpcServer {
+
+ public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
+
+ protected final InetSocketAddress bindAddress;
+
+ private final CountDownLatch closed = new CountDownLatch(1);
+ private final Channel serverChannel;
+ private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);;
+
+ public NettyRpcServer(final Server server, final String name,
+ final List<BlockingServiceAndInterface> services,
+ final InetSocketAddress bindAddress, Configuration conf,
+ RpcScheduler scheduler) throws IOException {
+ super(server, name, services, bindAddress, conf, scheduler);
+ this.bindAddress = bindAddress;
+ boolean useEpoll = useEpoll(conf);
+ int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count",
+ Runtime.getRuntime().availableProcessors() / 4);
+ EventLoopGroup bossGroup = null;
+ EventLoopGroup workerGroup = null;
+ if (useEpoll) {
+ bossGroup = new EpollEventLoopGroup(1);
+ workerGroup = new EpollEventLoopGroup(workerCount);
+ } else {
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup(workerCount);
+ }
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup);
+ if (useEpoll) {
+ bootstrap.channel(EpollServerSocketChannel.class);
+ } else {
+ bootstrap.channel(NioServerSocketChannel.class);
+ }
+ bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
+ bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
+ bootstrap.childOption(ChannelOption.ALLOCATOR,
+ PooledByteBufAllocator.DEFAULT);
+ bootstrap.childHandler(new Initializer(maxRequestSize));
+
+ try {
+ serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
+ LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress()
+ + ", hbase.netty.rpc.server.worker.count=" + workerCount
+ + ", useEpoll=" + useEpoll);
+ allChannels.add(serverChannel);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.getMessage());
+ }
+ initReconfigurable(conf);
+ this.scheduler.init(new RpcSchedulerContext(this));
+ }
+
+ private static boolean useEpoll(Configuration conf) {
+ // Config to enable native transport.
+ boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport",
+ true);
+ // Use the faster native epoll transport mechanism on linux if enabled
+ return epollEnabled && JVM.isLinux() && JVM.isAmd64();
+ }
+
+ @Override
+ public synchronized void start() {
+ if (started) {
+ return;
+ }
+ authTokenSecretMgr = createSecretManager();
+ if (authTokenSecretMgr != null) {
+ setSecretManager(authTokenSecretMgr);
+ authTokenSecretMgr.start();
+ }
+ this.authManager = new ServiceAuthorizationManager();
+ HBasePolicyProvider.init(conf, authManager);
+ scheduler.start();
+ started = true;
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+ LOG.info("Stopping server on " + this.bindAddress.getPort());
+ if (authTokenSecretMgr != null) {
+ authTokenSecretMgr.stop();
+ authTokenSecretMgr = null;
+ }
+ allChannels.close().awaitUninterruptibly();
+ serverChannel.close();
+ scheduler.stop();
+ closed.countDown();
+ running = false;
+ }
+
+ @Override
+ public synchronized void join() throws InterruptedException {
+ closed.await();
+ }
+
+ @Override
+ public synchronized InetSocketAddress getListenerAddress() {
+ return ((InetSocketAddress) serverChannel.localAddress());
+ }
+
+ public class NettyConnection extends RpcServer.Connection {
+
+ protected Channel channel;
+
+ NettyConnection(Channel channel) {
+ super();
+ this.channel = channel;
+ InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
+ this.addr = inetSocketAddress.getAddress();
+ if (addr == null) {
+ this.hostAddress = "*Unknown*";
+ } else {
+ this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
+ }
+ this.remotePort = inetSocketAddress.getPort();
+ this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
+ 0, null, null, 0, null);
+ this.setConnectionHeaderResponseCall = new Call(
+ CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
+ this, 0, null, null, 0, null);
+ this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
+ null, null, null, this, 0, null, null, 0, null);
+ }
+
+ void readPreamble(ByteBuf buffer) throws IOException {
+ byte[] rpcHead =
+ { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
+ if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
+ doBadPreambleHandling("Expected HEADER="
+ + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER="
+ + Bytes.toStringBinary(rpcHead) + " from " + toString());
+ return;
+ }
+ // Now read the next two bytes, the version and the auth to use.
+ int version = buffer.readByte();
+ byte authbyte = buffer.readByte();
+ this.authMethod = AuthMethod.valueOf(authbyte);
+ if (version != CURRENT_VERSION) {
+ String msg = getFatalConnectionString(version, authbyte);
+ doBadPreambleHandling(msg, new WrongVersionException(msg));
+ return;
+ }
+ if (authMethod == null) {
+ String msg = getFatalConnectionString(version, authbyte);
+ doBadPreambleHandling(msg, new BadAuthException(msg));
+ return;
+ }
+ if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+ if (allowFallbackToSimpleAuth) {
+ metrics.authenticationFallback();
+ authenticatedWithFallback = true;
+ } else {
+ AccessDeniedException ae = new AccessDeniedException(
+ "Authentication is required");
+ setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
+ ((Call) authFailedCall)
+ .sendResponseIfReady(ChannelFutureListener.CLOSE);
+ return;
+ }
+ }
+ if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+ doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
+ null);
+ authMethod = AuthMethod.SIMPLE;
+ // client has already sent the initial Sasl message and we
+ // should ignore it. Both client and server should fall back
+ // to simple auth from now on.
+ skipInitialSaslHandshake = true;
+ }
+ if (authMethod != AuthMethod.SIMPLE) {
+ useSasl = true;
+ }
+ connectionPreambleRead = true;
+ }
+
+ private void doBadPreambleHandling(final String msg) throws IOException {
+ doBadPreambleHandling(msg, new FatalConnectionException(msg));
+ }
+
+ private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
+ LOG.warn(msg);
+ Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
+ null, null, 0, null);
+ setupResponse(null, fakeCall, e, msg);
+ // closes out the connection.
+ fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
+ }
+
+ void process(final ByteBuf buf) throws IOException, InterruptedException {
+ if (connectionHeaderRead) {
+ this.callCleanup = new RpcServer.CallCleanup() {
+ @Override
+ public void run() {
+ buf.release();
+ }
+ };
+ process(new SingleByteBuff(buf.nioBuffer()));
+ } else {
+ byte[] data = new byte[buf.readableBytes()];
+ buf.readBytes(data, 0, data.length);
+ ByteBuffer connectionHeader = ByteBuffer.wrap(data);
+ buf.release();
+ process(connectionHeader);
+ }
+ }
+
+ void process(ByteBuffer buf) throws IOException, InterruptedException {
+ process(new SingleByteBuff(buf));
+ }
+
+ void process(ByteBuff buf) throws IOException, InterruptedException {
+ try {
+ if (skipInitialSaslHandshake) {
+ skipInitialSaslHandshake = false;
+ if (callCleanup != null) {
+ callCleanup.run();
+ }
+ return;
+ }
+
+ if (useSasl) {
+ saslReadAndProcess(buf);
+ } else {
+ processOneRpc(buf);
+ }
+ } catch (Exception e) {
+ if (callCleanup != null) {
+ callCleanup.run();
+ }
+ throw e;
+ } finally {
+ this.callCleanup = null;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ disposeSasl();
+ channel.close();
+ callCleanup = null;
+ }
+
+ @Override
+ public boolean isConnectionOpen() {
+ return channel.isOpen();
+ }
+
+ @Override
+ public RpcServer.Call createCall(int id, final BlockingService service,
+ final MethodDescriptor md, RequestHeader header, Message param,
+ CellScanner cellScanner, RpcServer.Connection connection, long size,
+ TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+ CallCleanup reqCleanup) {
+ return new Call(id, service, md, header, param, cellScanner, connection,
+ size, tinfo, remoteAddress, timeout, reqCleanup);
+ }
+ }
+
+ /**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries the
+ * result.
+ */
+ @InterfaceStability.Evolving
+ public class Call extends RpcServer.Call {
+
+ Call(int id, final BlockingService service, final MethodDescriptor md,
+ RequestHeader header, Message param, CellScanner cellScanner,
+ RpcServer.Connection connection, long size, TraceInfo tinfo,
+ final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
+ super(id, service, md, header, param, cellScanner,
+ connection, size, tinfo, remoteAddress, timeout, reqCleanup);
+ }
+
+ @Override
+ public long disconnectSince() {
+ if (!getConnection().isConnectionOpen()) {
+ return System.currentTimeMillis() - timestamp;
+ } else {
+ return -1L;
+ }
+ }
+
+ NettyConnection getConnection() {
+ return (NettyConnection) this.connection;
+ }
+
+ /**
+ * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
+ * respond to client. This is called by the RPC code in the context of the Handler thread.
+ */
+ @Override
+ public synchronized void sendResponseIfReady() throws IOException {
+ getConnection().channel.writeAndFlush(this);
+ }
+
+ public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
+ getConnection().channel.writeAndFlush(this).addListener(listener);
+ }
+
+ }
+
+ private class Initializer extends ChannelInitializer<SocketChannel> {
+
+ final int maxRequestSize;
+
+ Initializer(int maxRequestSize) {
+ this.maxRequestSize = maxRequestSize;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("header", new ConnectionHeaderHandler());
+ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
+ maxRequestSize, 0, 4, 0, 4, true));
+ pipeline.addLast("decoder", new MessageDecoder());
+ pipeline.addLast("encoder", new MessageEncoder());
+ }
+
+ }
+
+ private class ConnectionHeaderHandler extends ByteToMessageDecoder {
+ private NettyConnection connection;
+
+ ConnectionHeaderHandler() {
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf,
+ List<Object> out) throws Exception {
+ if (byteBuf.readableBytes() < 6) {
+ return;
+ }
+ connection = new NettyConnection(ctx.channel());
+ connection.readPreamble(byteBuf);
+ ((MessageDecoder) ctx.pipeline().get("decoder"))
+ .setConnection(connection);
+ ctx.pipeline().remove(this);
+ }
+
+ }
+
+ private class MessageDecoder extends ChannelInboundHandlerAdapter {
+
+ private NettyConnection connection;
+
+ void setConnection(NettyConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ allChannels.add(ctx.channel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connection from " + ctx.channel().remoteAddress()
+ + "; # active connections: " + getNumOpenConnections());
+ }
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ ByteBuf input = (ByteBuf) msg;
+ // 4 bytes length field
+ metrics.receivedBytes(input.readableBytes() + 4);
+ connection.process(input);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ allChannels.remove(ctx.channel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disconnecting client: " + ctx.channel().remoteAddress()
+ + ". Number of active connections: " + getNumOpenConnections());
+ }
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
+ allChannels.remove(ctx.channel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connection from " + ctx.channel().remoteAddress()
+ + " catch unexpected exception from downstream.", e.getCause());
+ }
+ ctx.channel().close();
+ }
+
+ }
+
+ private class MessageEncoder extends ChannelOutboundHandlerAdapter {
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+ final Call call = (Call) msg;
+ ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers());
+ ctx.write(response, promise).addListener(new CallWriteListener(call));
+ }
+
+ }
+
+ private class CallWriteListener implements ChannelFutureListener {
+
+ private Call call;
+
+ CallWriteListener(Call call) {
+ this.call = call;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ call.done();
+ if (future.isSuccess()) {
+ metrics.sentBytes(call.response.size());
+ }
+ }
+
+ }
+
+ @Override
+ public void setSocketSendBufSize(int size) {
+ }
+
+ @Override
+ public int getNumOpenConnections() {
+ // allChannels also contains the server channel, so exclude that from the count.
+ return allChannels.size() - 1;
+ }
+
+ @Override
+ public Pair<Message, CellScanner> call(BlockingService service,
+ MethodDescriptor md, Message param, CellScanner cellScanner,
+ long receiveTime, MonitoredRPCHandler status) throws IOException {
+ return call(service, md, param, cellScanner, receiveTime, status,
+ System.currentTimeMillis(), 0);
+ }
+
+ @Override
+ public Pair<Message, CellScanner> call(BlockingService service,
+ MethodDescriptor md, Message param, CellScanner cellScanner,
+ long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
+ throws IOException {
+ Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
+ -1, null, null, timeout, null);
+ fakeCall.setReceiveTime(receiveTime);
+ return call(fakeCall, status);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dbd025c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 4b0c974..ebae1fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -20,12 +20,20 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -34,6 +42,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.LongAdder;
import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
@@ -50,11 +59,13 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@@ -66,6 +77,9 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -73,11 +87,13 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
@@ -91,12 +107,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformati
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
@@ -459,7 +481,7 @@ public abstract class RpcServer implements RpcServerInterface,
}
}
- private void setExceptionResponse(Throwable t, String errorMsg,
+ protected void setExceptionResponse(Throwable t, String errorMsg,
ResponseHeader.Builder headerBuilder) {
ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
exceptionBuilder.setExceptionClassName(t.getClass().getName());
@@ -477,7 +499,7 @@ public abstract class RpcServer implements RpcServerInterface,
headerBuilder.setException(exceptionBuilder.build());
}
- private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+ protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
@@ -550,7 +572,7 @@ public abstract class RpcServer implements RpcServerInterface,
return pbBuf;
}
- private BufferChain wrapWithSasl(BufferChain bc)
+ protected BufferChain wrapWithSasl(BufferChain bc)
throws IOException {
if (!this.connection.useSasl) return bc;
// Looks like no way around this; saslserver wants a byte array. I have to make it one.
@@ -721,7 +743,7 @@ public abstract class RpcServer implements RpcServerInterface,
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="VO_VOLATILE_INCREMENT",
justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
- public abstract class Connection {
+ public abstract class Connection implements Closeable {
// If initial preamble with version and magic has been read or not.
protected boolean connectionPreambleRead = false;
// If the connection header has been read or not.
@@ -749,7 +771,9 @@ public abstract class RpcServer implements RpcServerInterface,
protected AuthMethod authMethod;
protected boolean saslContextEstablished;
protected boolean skipInitialSaslHandshake;
-
+ private ByteBuffer unwrappedData;
+ // When is this set? FindBugs wants to know! Says NP
+ private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
protected boolean useSasl;
protected SaslServer saslServer;
protected CryptoAES cryptoAES;
@@ -757,14 +781,15 @@ public abstract class RpcServer implements RpcServerInterface,
protected boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
protected static final int AUTHORIZATION_FAILED_CALLID = -1;
-
+ protected Call authFailedCall;
protected ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
protected static final int SASL_CALLID = -33;
-
+ protected Call saslCall;
// Fake 'call' for connection header response
protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
+ protected Call setConnectionHeaderResponseCall;
// was authentication allowed with a fallback to simple auth
protected boolean authenticatedWithFallback;
@@ -955,8 +980,496 @@ public abstract class RpcServer implements RpcServerInterface,
return ugi;
}
+ protected void disposeSasl() {
+ if (saslServer != null) {
+ try {
+ saslServer.dispose();
+ saslServer = null;
+ } catch (SaslException ignored) {
+ // Ignored. This is being disposed of anyway.
+ }
+ }
+ }
+
+ /**
+ * No protobuf encoding of raw sasl messages
+ */
+ protected void doRawSaslReply(SaslStatus status, Writable rv,
+ String errorClass, String error) throws IOException {
+ ByteBufferOutputStream saslResponse = null;
+ DataOutputStream out = null;
+ try {
+ // In my testing, have noticed that sasl messages are usually
+ // in the ballpark of 100-200. That's why the initial capacity is 256.
+ saslResponse = new ByteBufferOutputStream(256);
+ out = new DataOutputStream(saslResponse);
+ out.writeInt(status.state); // write status
+ if (status == SaslStatus.SUCCESS) {
+ rv.write(out);
+ } else {
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
+ }
+ saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
+ saslCall.sendResponseIfReady();
+ } finally {
+ if (saslResponse != null) {
+ saslResponse.close();
+ }
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
+ InterruptedException {
+ if (saslContextEstablished) {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Have read input token of size " + saslToken.limit()
+ + " for processing by saslServer.unwrap()");
+
+ if (!useWrap) {
+ processOneRpc(saslToken);
+ } else {
+ byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
+ byte [] plaintextData;
+ if (useCryptoAesWrap) {
+ // unwrap with CryptoAES
+ plaintextData = cryptoAES.unwrap(b, 0, b.length);
+ } else {
+ plaintextData = saslServer.unwrap(b, 0, b.length);
+ }
+ processUnwrappedData(plaintextData);
+ }
+ } else {
+ byte[] replyToken;
+ try {
+ if (saslServer == null) {
+ switch (authMethod) {
+ case DIGEST:
+ if (secretManager == null) {
+ throw new AccessDeniedException(
+ "Server is not configured to do DIGEST authentication.");
+ }
+ saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
+ .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
+ HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
+ secretManager, this));
+ break;
+ default:
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ String fullName = current.getUserName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Kerberos principal name is " + fullName);
+ }
+ final String names[] = SaslUtil.splitKerberosName(fullName);
+ if (names.length != 3) {
+ throw new AccessDeniedException(
+ "Kerberos principal name does NOT have the expected "
+ + "hostname part: " + fullName);
+ }
+ current.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws SaslException {
+ saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
+ .getMechanismName(), names[0], names[1],
+ HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
+ return null;
+ }
+ });
+ }
+ if (saslServer == null)
+ throw new AccessDeniedException(
+ "Unable to find SASL server implementation for "
+ + authMethod.getMechanismName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Have read input token of size " + saslToken.limit()
+ + " for processing by saslServer.evaluateResponse()");
+ }
+ replyToken = saslServer
+ .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
+ } catch (IOException e) {
+ IOException sendToClient = e;
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof InvalidToken) {
+ sendToClient = (InvalidToken) cause;
+ break;
+ }
+ cause = cause.getCause();
+ }
+ doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
+ sendToClient.getLocalizedMessage());
+ metrics.authenticationFailure();
+ String clientIP = this.toString();
+ // attempting user could be null
+ AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
+ throw e;
+ }
+ if (replyToken != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will send token of size " + replyToken.length
+ + " from saslServer.");
+ }
+ doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
+ null);
+ }
+ if (saslServer.isComplete()) {
+ String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+ useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
+ ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server context established. Authenticated client: "
+ + ugi + ". Negotiated QoP is "
+ + saslServer.getNegotiatedProperty(Sasl.QOP));
+ }
+ metrics.authenticationSuccess();
+ AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
+ saslContextEstablished = true;
+ }
+ }
+ }
+
+ private void processUnwrappedData(byte[] inBuf) throws IOException,
+ InterruptedException {
+ ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
+ // Read all RPCs contained in the inBuf, even partial ones
+ while (true) {
+ int count;
+ if (unwrappedDataLengthBuffer.remaining() > 0) {
+ count = channelRead(ch, unwrappedDataLengthBuffer);
+ if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
+ return;
+ }
+
+ if (unwrappedData == null) {
+ unwrappedDataLengthBuffer.flip();
+ int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
+
+ if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received ping message");
+ unwrappedDataLengthBuffer.clear();
+ continue; // ping message
+ }
+ unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
+ }
+
+ count = channelRead(ch, unwrappedData);
+ if (count <= 0 || unwrappedData.remaining() > 0)
+ return;
+
+ if (unwrappedData.remaining() == 0) {
+ unwrappedDataLengthBuffer.clear();
+ unwrappedData.flip();
+ processOneRpc(new SingleByteBuff(unwrappedData));
+ unwrappedData = null;
+ }
+ }
+ }
+
+ public void processOneRpc(ByteBuff buf) throws IOException,
+ InterruptedException {
+ if (connectionHeaderRead) {
+ processRequest(buf);
+ } else {
+ processConnectionHeader(buf);
+ this.connectionHeaderRead = true;
+ if (!authorizeConnection()) {
+ // Throw FatalConnectionException wrapping ACE so client does right thing and closes
+ // down the connection instead of trying to read non-existent retun.
+ throw new AccessDeniedException("Connection from " + this + " for service " +
+ connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
+ }
+ this.user = userProvider.create(this.ugi);
+ }
+ }
+
+ protected boolean authorizeConnection() throws IOException {
+ try {
+ // If auth method is DIGEST, the token was obtained by the
+ // real user for the effective user, therefore not required to
+ // authorize real user. doAs is allowed only for simple or kerberos
+ // authentication
+ if (ugi != null && ugi.getRealUser() != null
+ && (authMethod != AuthMethod.DIGEST)) {
+ ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
+ }
+ authorize(ugi, connectionHeader, getHostInetAddress());
+ metrics.authorizationSuccess();
+ } catch (AuthorizationException ae) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
+ }
+ metrics.authorizationFailure();
+ setupResponse(authFailedResponse, authFailedCall,
+ new AccessDeniedException(ae), ae.getMessage());
+ authFailedCall.sendResponseIfReady();
+ return false;
+ }
+ return true;
+ }
+
+ // Reads the connection header following version
+ protected void processConnectionHeader(ByteBuff buf) throws IOException {
+ if (buf.hasArray()) {
+ this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
+ } else {
+ CodedInputStream cis = UnsafeByteOperations
+ .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
+ cis.enableAliasing(true);
+ this.connectionHeader = ConnectionHeader.parseFrom(cis);
+ }
+ String serviceName = connectionHeader.getServiceName();
+ if (serviceName == null) throw new EmptyServiceNameException();
+ this.service = getService(services, serviceName);
+ if (this.service == null) throw new UnknownServiceException(serviceName);
+ setupCellBlockCodecs(this.connectionHeader);
+ RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
+ RPCProtos.ConnectionHeaderResponse.newBuilder();
+ setupCryptoCipher(this.connectionHeader, chrBuilder);
+ responseConnectionHeader(chrBuilder);
+ UserGroupInformation protocolUser = createUser(connectionHeader);
+ if (!useSasl) {
+ ugi = protocolUser;
+ if (ugi != null) {
+ ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+ }
+ // audit logging for SASL authenticated users happens in saslReadAndProcess()
+ if (authenticatedWithFallback) {
+ LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
+ + " connecting from " + getHostAddress());
+ }
+ AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
+ } else {
+ // user is authenticated
+ ugi.setAuthenticationMethod(authMethod.authenticationMethod);
+ //Now we check if this is a proxy user case. If the protocol user is
+ //different from the 'user', it is a proxy user scenario. However,
+ //this is not allowed if user authenticated with DIGEST.
+ if ((protocolUser != null)
+ && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
+ if (authMethod == AuthMethod.DIGEST) {
+ // Not allowed to doAs if token authentication is used
+ throw new AccessDeniedException("Authenticated user (" + ugi
+ + ") doesn't match what the client claims to be ("
+ + protocolUser + ")");
+ } else {
+ // Effective user can be different from authenticated user
+ // for simple auth or kerberos auth
+ // The user is the real user. Now we create a proxy user
+ UserGroupInformation realUser = ugi;
+ ugi = UserGroupInformation.createProxyUser(protocolUser
+ .getUserName(), realUser);
+ // Now the user is a proxy user, set Authentication method Proxy.
+ ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
+ }
+ }
+ }
+ if (connectionHeader.hasVersionInfo()) {
+ // see if this connection will support RetryImmediatelyException
+ retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
+
+ AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ + " with version info: "
+ + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
+ } else {
+ AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ + " with unknown version info");
+ }
+ }
+
+ private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
+ throws FatalConnectionException {
+ // Response the connection header if Crypto AES is enabled
+ if (!chrBuilder.hasCryptoCipherMeta()) return;
+ try {
+ byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
+ // encrypt the Crypto AES cipher meta data with sasl server, and send to client
+ byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
+ Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
+ Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
+
+ doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
+ } catch (IOException ex) {
+ throw new UnsupportedCryptoException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Send the response for connection header
+ */
+ private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
+ throws IOException {
+ ByteBufferOutputStream response = null;
+ DataOutputStream out = null;
+ try {
+ response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
+ out = new DataOutputStream(response);
+ out.writeInt(wrappedCipherMetaData.length);
+ out.write(wrappedCipherMetaData);
+
+ setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
+ .getByteBuffer());
+ setConnectionHeaderResponseCall.sendResponseIfReady();
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ if (response != null) {
+ response.close();
+ }
+ }
+ }
+
+ /**
+ * @param buf
+ * Has the request header and the request param and optionally
+ * encoded data buffer all in this one array.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected void processRequest(ByteBuff buf) throws IOException,
+ InterruptedException {
+ long totalRequestSize = buf.limit();
+ int offset = 0;
+ // Here we read in the header. We avoid having pb
+ // do its default 4k allocation for CodedInputStream. We force it to use
+ // backing array.
+ CodedInputStream cis;
+ if (buf.hasArray()) {
+ cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit())
+ .newCodedInput();
+ } else {
+ cis = UnsafeByteOperations.unsafeWrap(
+ new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit())
+ .newCodedInput();
+ }
+ cis.enableAliasing(true);
+ int headerSize = cis.readRawVarint32();
+ offset = cis.getTotalBytesRead();
+ Message.Builder builder = RequestHeader.newBuilder();
+ ProtobufUtil.mergeFrom(builder, cis, headerSize);
+ RequestHeader header = (RequestHeader) builder.build();
+ offset += headerSize;
+ int id = header.getCallId();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
+ + " totalRequestSize: " + totalRequestSize + " bytes");
+ }
+ // Enforcing the call queue size, this triggers a retry in the client
+ // This is a bit late to be doing this check - we have already read in the
+ // total request.
+ if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
+ final RpcServer.Call callTooBig = createCall(id, this.service, null,
+ null, null, null, this, totalRequestSize, null, null, 0,
+ this.callCleanup);
+ ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+ metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
+ "Call queue is full on " + server.getServerName()
+ + ", is hbase.ipc.server.max.callqueue.size too small?");
+ callTooBig.sendResponseIfReady();
+ return;
+ }
+ MethodDescriptor md = null;
+ Message param = null;
+ CellScanner cellScanner = null;
+ try {
+ if (header.hasRequestParam() && header.getRequestParam()) {
+ md = this.service.getDescriptorForType().findMethodByName(
+ header.getMethodName());
+ if (md == null)
+ throw new UnsupportedOperationException(header.getMethodName());
+ builder = this.service.getRequestPrototype(md).newBuilderForType();
+ cis.resetSizeCounter();
+ int paramSize = cis.readRawVarint32();
+ offset += cis.getTotalBytesRead();
+ if (builder != null) {
+ ProtobufUtil.mergeFrom(builder, cis, paramSize);
+ param = builder.build();
+ }
+ offset += paramSize;
+ } else {
+ // currently header must have request param, so we directly throw
+ // exception here
+ String msg = "Invalid request header: "
+ + TextFormat.shortDebugString(header)
+ + ", should have param set in it";
+ LOG.warn(msg);
+ throw new DoNotRetryIOException(msg);
+ }
+ if (header.hasCellBlockMeta()) {
+ buf.position(offset);
+ ByteBuff dup = buf.duplicate();
+ dup.limit(offset + header.getCellBlockMeta().getLength());
+ cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(
+ this.codec, this.compressionCodec, dup);
+ }
+ } catch (Throwable t) {
+ InetSocketAddress address = getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)")
+ + " is unable to read call parameter from client "
+ + getHostAddress();
+ LOG.warn(msg, t);
+
+ metrics.exception(t);
+
+ // probably the hbase hadoop version does not match the running hadoop
+ // version
+ if (t instanceof LinkageError) {
+ t = new DoNotRetryIOException(t);
+ }
+ // If the method is not present on the server, do not retry.
+ if (t instanceof UnsupportedOperationException) {
+ t = new DoNotRetryIOException(t);
+ }
+
+ final RpcServer.Call readParamsFailedCall = createCall(id,
+ this.service, null, null, null, null, this, totalRequestSize, null,
+ null, 0, this.callCleanup);
+ ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+ setupResponse(responseBuffer, readParamsFailedCall, t,
+ msg + "; " + t.getMessage());
+ readParamsFailedCall.sendResponseIfReady();
+ return;
+ }
+
+ TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header
+ .getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
+ : null;
+ int timeout = 0;
+ if (header.hasTimeout() && header.getTimeout() > 0) {
+ timeout = Math.max(minClientRequestTimeout, header.getTimeout());
+ }
+ RpcServer.Call call = createCall(id, this.service, md, header, param,
+ cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout,
+ this.callCleanup);
+
+ if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
+ callQueueSizeInBytes.add(-1 * call.getSize());
+
+ ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+ metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
+ "Call queue is full on " + server.getServerName()
+ + ", too many items queued ?");
+ call.sendResponseIfReady();
+ }
+ }
+
public abstract boolean isConnectionOpen();
+ public abstract Call createCall(int id, final BlockingService service,
+ final MethodDescriptor md, RequestHeader header, Message param,
+ CellScanner cellScanner, Connection connection, long size,
+ TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+ CallCleanup reqCleanup);
}
/**
@@ -1079,6 +1592,20 @@ public abstract class RpcServer implements RpcServerInterface,
}
}
+ /**
+ * Setup response for the RPC Call.
+ *
+ * @param response buffer to serialize the response into
+ * @param call {@link Call} to which we are setting up the response
+ * @param error error message, if the call failed
+ * @throws IOException
+ */
+ protected void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
+ throws IOException {
+ if (response != null) response.reset();
+ call.setResponse(null, null, t, error);
+ }
+
Configuration getConf() {
return conf;
}
@@ -1285,6 +1812,77 @@ public abstract class RpcServer implements RpcServerInterface,
}
/**
+ * When the read or write buffer size is larger than this limit, i/o will be
+ * done in chunks of this size. Most RPC requests and responses would be
+ * be smaller.
+ */
+ protected static final int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
+
+ /**
+ * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
+ * If the amount of data is large, it writes to channel in smaller chunks.
+ * This is to avoid jdk from creating many direct buffers as the size of
+ * ByteBuffer increases. There should not be any performance degredation.
+ *
+ * @param channel writable byte channel to write on
+ * @param buffer buffer to write
+ * @return number of bytes written
+ * @throws java.io.IOException e
+ * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
+ */
+ protected int channelRead(ReadableByteChannel channel,
+ ByteBuffer buffer) throws IOException {
+
+ int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+ channel.read(buffer) : channelIO(channel, null, buffer);
+ if (count > 0) {
+ metrics.receivedBytes(count);
+ }
+ return count;
+ }
+
+ /**
+ * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
+ * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
+ * one of readCh or writeCh should be non-null.
+ *
+ * @param readCh read channel
+ * @param writeCh write channel
+ * @param buf buffer to read or write into/out of
+ * @return bytes written
+ * @throws java.io.IOException e
+ * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
+ * @see #channelWrite(GatheringByteChannel, BufferChain)
+ */
+ private static int channelIO(ReadableByteChannel readCh,
+ WritableByteChannel writeCh,
+ ByteBuffer buf) throws IOException {
+
+ int originalLimit = buf.limit();
+ int initialRemaining = buf.remaining();
+ int ret = 0;
+
+ while (buf.remaining() > 0) {
+ try {
+ int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+ buf.limit(buf.position() + ioSize);
+
+ ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
+
+ if (ret < ioSize) {
+ break;
+ }
+
+ } finally {
+ buf.limit(originalLimit);
+ }
+ }
+
+ int nBytes = initialRemaining - buf.remaining();
+ return (nBytes > 0) ? nBytes : ret;
+ }
+
+ /**
* This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
* as much as possible.
*
@@ -1494,4 +2092,4 @@ public abstract class RpcServer implements RpcServerInterface,
return this.length;
}
}
-}
\ No newline at end of file
+}