You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/06 04:58:44 UTC
hbase git commit: Rebase with master fixup
Repository: hbase
Updated Branches:
refs/heads/HBASE-14614 b4c5e2df4 -> 62d4eecac
Rebase with master fixup
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/62d4eeca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/62d4eeca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/62d4eeca
Branch: refs/heads/HBASE-14614
Commit: 62d4eecac0c2abb3a4dc1c6829a33ea11d2f6371
Parents: b4c5e2d
Author: Michael Stack <st...@apache.org>
Authored: Fri May 5 21:58:22 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Fri May 5 21:58:22 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/ipc/SimpleRpcServer.java | 292 +------------------
1 file changed, 1 insertion(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/62d4eeca/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index a22a85f..e5ba5f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -1051,298 +1051,8 @@ public class SimpleRpcServer extends RpcServer {
return -1;
}
- // Reads the connection header following version
- private void processConnectionHeader(ByteBuff buf) throws IOException {
- if (buf.hasArray()) {
- this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
- } else {
- CodedInputStream cis = UnsafeByteOperations
- .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
- cis.enableAliasing(true);
- this.connectionHeader = ConnectionHeader.parseFrom(cis);
- }
- String serviceName = connectionHeader.getServiceName();
- if (serviceName == null) throw new EmptyServiceNameException();
- this.service = getService(services, serviceName);
- if (this.service == null) {
- throw new UnknownServiceException(serviceName);
- }
- setupCellBlockCodecs(this.connectionHeader);
- RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
- RPCProtos.ConnectionHeaderResponse.newBuilder();
- setupCryptoCipher(this.connectionHeader, chrBuilder);
- responseConnectionHeader(chrBuilder);
- UserGroupInformation protocolUser = createUser(connectionHeader);
- if (!useSasl) {
- ugi = protocolUser;
- if (ugi != null) {
- ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
- }
- // audit logging for SASL authenticated users happens in saslReadAndProcess()
- if (authenticatedWithFallback) {
- LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
- + " connecting from " + getHostAddress());
- }
- AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
- } else {
- // user is authenticated
- ugi.setAuthenticationMethod(authMethod.authenticationMethod);
- //Now we check if this is a proxy user case. If the protocol user is
- //different from the 'user', it is a proxy user scenario. However,
- //this is not allowed if user authenticated with DIGEST.
- if ((protocolUser != null)
- && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
- if (authMethod == AuthMethod.DIGEST) {
- // Not allowed to doAs if token authentication is used
- throw new AccessDeniedException("Authenticated user (" + ugi
- + ") doesn't match what the client claims to be ("
- + protocolUser + ")");
- } else {
- // Effective user can be different from authenticated user
- // for simple auth or kerberos auth
- // The user is the real user. Now we create a proxy user
- UserGroupInformation realUser = ugi;
- ugi = UserGroupInformation.createProxyUser(protocolUser
- .getUserName(), realUser);
- // Now the user is a proxy user, set Authentication method Proxy.
- ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
- }
- }
- }
- if (connectionHeader.hasVersionInfo()) {
- // see if this connection will support RetryImmediatelyException
- retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
-
- AUDITLOG.info("Connection from " + this.hostAddress + ", port: " + this.remotePort
- + ", "
- + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
- } else {
- AUDITLOG.info("Connection from " + this.hostAddress + ", port: " + this.remotePort
- + ", UNKNOWN version info");
- }
- }
-
- private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
- throws FatalConnectionException {
- // Response the connection header if Crypto AES is enabled
- if (!chrBuilder.hasCryptoCipherMeta()) return;
- try {
- byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
- // encrypt the Crypto AES cipher meta data with sasl server, and send to client
- byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
- Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
- Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
-
- doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
- } catch (IOException ex) {
- throw new UnsupportedCryptoException(ex.getMessage(), ex);
- }
- }
-
- private void processUnwrappedData(byte[] inBuf) throws IOException,
- InterruptedException {
- ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
- // Read all RPCs contained in the inBuf, even partial ones
- while (true) {
- int count;
- if (unwrappedDataLengthBuffer.remaining() > 0) {
- count = channelRead(ch, unwrappedDataLengthBuffer);
- if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
- return;
- }
-
- if (unwrappedData == null) {
- unwrappedDataLengthBuffer.flip();
- int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
- if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
- if (LOG.isDebugEnabled())
- LOG.debug("Received ping message");
- unwrappedDataLengthBuffer.clear();
- continue; // ping message
- }
- unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
- }
-
- count = channelRead(ch, unwrappedData);
- if (count <= 0 || unwrappedData.remaining() > 0)
- return;
-
- if (unwrappedData.remaining() == 0) {
- unwrappedDataLengthBuffer.clear();
- unwrappedData.flip();
- processOneRpc(new SingleByteBuff(unwrappedData));
- unwrappedData = null;
- }
- }
- }
-
- private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
- if (connectionHeaderRead) {
- processRequest(buf);
- } else {
- processConnectionHeader(buf);
- this.connectionHeaderRead = true;
- if (!authorizeConnection()) {
- // Throw FatalConnectionException wrapping ACE so client does right thing and closes
- // down the connection instead of trying to read non-existent retun.
- throw new AccessDeniedException("Connection from " + this + " for service " +
- connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
- }
- this.user = userProvider.create(this.ugi);
- }
- }
-
- /**
- * @param buf Has the request header and the request param and optionally encoded data buffer
- * all in this one array.
- * @throws IOException
- * @throws InterruptedException
- */
- protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
- long totalRequestSize = buf.limit();
- int offset = 0;
- // Here we read in the header. We avoid having pb
- // do its default 4k allocation for CodedInputStream. We force it to use backing array.
- CodedInputStream cis;
- if (buf.hasArray()) {
- cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
- } else {
- cis = UnsafeByteOperations
- .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
- }
- cis.enableAliasing(true);
- int headerSize = cis.readRawVarint32();
- offset = cis.getTotalBytesRead();
- Message.Builder builder = RequestHeader.newBuilder();
- ProtobufUtil.mergeFrom(builder, cis, headerSize);
- RequestHeader header = (RequestHeader) builder.build();
- offset += headerSize;
- int id = header.getCallId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
- " totalRequestSize: " + totalRequestSize + " bytes");
- }
- // Enforcing the call queue size, this triggers a retry in the client
- // This is a bit late to be doing this check - we have already read in the total request.
- if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
- final Call callTooBig =
- new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0, this.callCleanup);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
- setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + server.getServerName() +
- ", is hbase.ipc.server.max.callqueue.size too small?");
- responder.doRespond(callTooBig);
- return;
- }
- MethodDescriptor md = null;
- Message param = null;
- CellScanner cellScanner = null;
- try {
- if (header.hasRequestParam() && header.getRequestParam()) {
- md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
- if (md == null) throw new UnsupportedOperationException(header.getMethodName());
- builder = this.service.getRequestPrototype(md).newBuilderForType();
- cis.resetSizeCounter();
- int paramSize = cis.readRawVarint32();
- offset += cis.getTotalBytesRead();
- if (builder != null) {
- ProtobufUtil.mergeFrom(builder, cis, paramSize);
- param = builder.build();
- }
- offset += paramSize;
- } else {
- // currently header must have request param, so we directly throw exception here
- String msg = "Invalid request header: " + TextFormat.shortDebugString(header)
- + ", should have param set in it";
- LOG.warn(msg);
- throw new DoNotRetryIOException(msg);
- }
- if (header.hasCellBlockMeta()) {
- buf.position(offset);
- ByteBuff dup = buf.duplicate();
- dup.limit(offset + header.getCellBlockMeta().getLength());
- cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
- this.compressionCodec, dup);
- }
- } catch (Throwable t) {
- InetSocketAddress address = getListenerAddress();
- String msg = (address != null ? address : "(channel closed)") +
- " is unable to read call parameter from client " + getHostAddress();
- LOG.warn(msg, t);
-
- metrics.exception(t);
-
- // probably the hbase hadoop version does not match the running hadoop version
- if (t instanceof LinkageError) {
- t = new DoNotRetryIOException(t);
- }
- // If the method is not present on the server, do not retry.
- if (t instanceof UnsupportedOperationException) {
- t = new DoNotRetryIOException(t);
- }
-
- final Call readParamsFailedCall =
- new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0, this.callCleanup);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, readParamsFailedCall, t,
- msg + "; " + t.getMessage());
- responder.doRespond(readParamsFailedCall);
- return;
- }
-
- TraceInfo traceInfo = header.hasTraceInfo()
- ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
- : null;
- int timeout = 0;
- if (header.hasTimeout() && header.getTimeout() > 0){
- timeout = Math.max(minClientRequestTimeout, header.getTimeout());
- }
- Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
- totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup);
-
- if (!scheduler.dispatch(new CallRunner(SimpleRpcServer.this, call))) {
- callQueueSizeInBytes.add(-1 * call.getSize());
-
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
- setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + server.getServerName() +
- ", too many items queued ?");
- responder.doRespond(call);
- }
- }
-
- private boolean authorizeConnection() throws IOException {
- try {
- // If auth method is DIGEST, the token was obtained by the
- // real user for the effective user, therefore not required to
- // authorize real user. doAs is allowed only for simple or kerberos
- // authentication
- if (ugi != null && ugi.getRealUser() != null
- && (authMethod != AuthMethod.DIGEST)) {
- ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
- }
- authorize(ugi, connectionHeader, getHostInetAddress());
- metrics.authorizationSuccess();
- } catch (AuthorizationException ae) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
- }
- metrics.authorizationFailure();
- setupResponse(authFailedResponse, authFailedCall,
- new AccessDeniedException(ae), ae.getMessage());
- responder.doRespond(authFailedCall);
- return false;
- }
- return true;
- }
-
@Override
- protected synchronized void close() {
+ public synchronized void close() {
disposeSasl();
data = null;
callCleanup = null;