You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/06 04:58:44 UTC

hbase git commit: Rebase with master fixup

Repository: hbase
Updated Branches:
  refs/heads/HBASE-14614 b4c5e2df4 -> 62d4eecac


Rebase with master fixup


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/62d4eeca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/62d4eeca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/62d4eeca

Branch: refs/heads/HBASE-14614
Commit: 62d4eecac0c2abb3a4dc1c6829a33ea11d2f6371
Parents: b4c5e2d
Author: Michael Stack <st...@apache.org>
Authored: Fri May 5 21:58:22 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Fri May 5 21:58:22 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/SimpleRpcServer.java       | 292 +------------------
 1 file changed, 1 insertion(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/62d4eeca/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index a22a85f..e5ba5f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -1051,298 +1051,8 @@ public class SimpleRpcServer extends RpcServer {
       return -1;
     }
 
-    // Reads the connection header following version
-    private void processConnectionHeader(ByteBuff buf) throws IOException {
-      if (buf.hasArray()) {
-        this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
-      } else {
-        CodedInputStream cis = UnsafeByteOperations
-            .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
-        cis.enableAliasing(true);
-        this.connectionHeader = ConnectionHeader.parseFrom(cis);
-      }
-      String serviceName = connectionHeader.getServiceName();
-      if (serviceName == null) throw new EmptyServiceNameException();
-      this.service = getService(services, serviceName);
-      if (this.service == null) {
-        throw new UnknownServiceException(serviceName);
-      }
-      setupCellBlockCodecs(this.connectionHeader);
-      RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
-          RPCProtos.ConnectionHeaderResponse.newBuilder();
-      setupCryptoCipher(this.connectionHeader, chrBuilder);
-      responseConnectionHeader(chrBuilder);
-      UserGroupInformation protocolUser = createUser(connectionHeader);
-      if (!useSasl) {
-        ugi = protocolUser;
-        if (ugi != null) {
-          ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
-        }
-        // audit logging for SASL authenticated users happens in saslReadAndProcess()
-        if (authenticatedWithFallback) {
-          LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
-              + " connecting from " + getHostAddress());
-        }
-        AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
-      } else {
-        // user is authenticated
-        ugi.setAuthenticationMethod(authMethod.authenticationMethod);
-        //Now we check if this is a proxy user case. If the protocol user is
-        //different from the 'user', it is a proxy user scenario. However,
-        //this is not allowed if user authenticated with DIGEST.
-        if ((protocolUser != null)
-            && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
-          if (authMethod == AuthMethod.DIGEST) {
-            // Not allowed to doAs if token authentication is used
-            throw new AccessDeniedException("Authenticated user (" + ugi
-                + ") doesn't match what the client claims to be ("
-                + protocolUser + ")");
-          } else {
-            // Effective user can be different from authenticated user
-            // for simple auth or kerberos auth
-            // The user is the real user. Now we create a proxy user
-            UserGroupInformation realUser = ugi;
-            ugi = UserGroupInformation.createProxyUser(protocolUser
-                .getUserName(), realUser);
-            // Now the user is a proxy user, set Authentication method Proxy.
-            ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
-          }
-        }
-      }
-      if (connectionHeader.hasVersionInfo()) {
-        // see if this connection will support RetryImmediatelyException
-        retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
-
-        AUDITLOG.info("Connection from " + this.hostAddress + ", port: " + this.remotePort
-            + ", "
-            + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
-      } else {
-        AUDITLOG.info("Connection from " + this.hostAddress + ", port: " + this.remotePort
-            + ", UNKNOWN version info");
-      }
-    }
-
-    private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
-        throws FatalConnectionException {
-      // Response the connection header if Crypto AES is enabled
-      if (!chrBuilder.hasCryptoCipherMeta()) return;
-      try {
-        byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
-        // encrypt the Crypto AES cipher meta data with sasl server, and send to client
-        byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
-        Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
-        Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
-
-        doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
-      } catch (IOException ex) {
-        throw new UnsupportedCryptoException(ex.getMessage(), ex);
-      }
-    }
-
-    private void processUnwrappedData(byte[] inBuf) throws IOException,
-    InterruptedException {
-      ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
-      // Read all RPCs contained in the inBuf, even partial ones
-      while (true) {
-        int count;
-        if (unwrappedDataLengthBuffer.remaining() > 0) {
-          count = channelRead(ch, unwrappedDataLengthBuffer);
-          if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
-            return;
-        }
-
-        if (unwrappedData == null) {
-          unwrappedDataLengthBuffer.flip();
-          int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
-          if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
-            if (LOG.isDebugEnabled())
-              LOG.debug("Received ping message");
-            unwrappedDataLengthBuffer.clear();
-            continue; // ping message
-          }
-          unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
-        }
-
-        count = channelRead(ch, unwrappedData);
-        if (count <= 0 || unwrappedData.remaining() > 0)
-          return;
-
-        if (unwrappedData.remaining() == 0) {
-          unwrappedDataLengthBuffer.clear();
-          unwrappedData.flip();
-          processOneRpc(new SingleByteBuff(unwrappedData));
-          unwrappedData = null;
-        }
-      }
-    }
-
-    private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
-      if (connectionHeaderRead) {
-        processRequest(buf);
-      } else {
-        processConnectionHeader(buf);
-        this.connectionHeaderRead = true;
-        if (!authorizeConnection()) {
-          // Throw FatalConnectionException wrapping ACE so client does right thing and closes
-          // down the connection instead of trying to read non-existent retun.
-          throw new AccessDeniedException("Connection from " + this + " for service " +
-            connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
-        }
-        this.user = userProvider.create(this.ugi);
-      }
-    }
-
-    /**
-     * @param buf Has the request header and the request param and optionally encoded data buffer
-     * all in this one array.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
-      long totalRequestSize = buf.limit();
-      int offset = 0;
-      // Here we read in the header.  We avoid having pb
-      // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
-      CodedInputStream cis;
-      if (buf.hasArray()) {
-        cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput();
-      } else {
-        cis = UnsafeByteOperations
-            .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
-      }
-      cis.enableAliasing(true);
-      int headerSize = cis.readRawVarint32();
-      offset = cis.getTotalBytesRead();
-      Message.Builder builder = RequestHeader.newBuilder();
-      ProtobufUtil.mergeFrom(builder, cis, headerSize);
-      RequestHeader header = (RequestHeader) builder.build();
-      offset += headerSize;
-      int id = header.getCallId();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
-          " totalRequestSize: " + totalRequestSize + " bytes");
-      }
-      // Enforcing the call queue size, this triggers a retry in the client
-      // This is a bit late to be doing this check - we have already read in the total request.
-      if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
-        final Call callTooBig =
-          new Call(id, this.service, null, null, null, null, this,
-            responder, totalRequestSize, null, null, 0, this.callCleanup);
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
-        setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
-            "Call queue is full on " + server.getServerName() +
-                ", is hbase.ipc.server.max.callqueue.size too small?");
-        responder.doRespond(callTooBig);
-        return;
-      }
-      MethodDescriptor md = null;
-      Message param = null;
-      CellScanner cellScanner = null;
-      try {
-        if (header.hasRequestParam() && header.getRequestParam()) {
-          md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
-          if (md == null) throw new UnsupportedOperationException(header.getMethodName());
-          builder = this.service.getRequestPrototype(md).newBuilderForType();
-          cis.resetSizeCounter();
-          int paramSize = cis.readRawVarint32();
-          offset += cis.getTotalBytesRead();
-          if (builder != null) {
-            ProtobufUtil.mergeFrom(builder, cis, paramSize);
-            param = builder.build();
-          }
-          offset += paramSize;
-        } else {
-          // currently header must have request param, so we directly throw exception here
-          String msg = "Invalid request header: " + TextFormat.shortDebugString(header)
-              + ", should have param set in it";
-          LOG.warn(msg);
-          throw new DoNotRetryIOException(msg);
-        }
-        if (header.hasCellBlockMeta()) {
-          buf.position(offset);
-          ByteBuff dup = buf.duplicate();
-          dup.limit(offset + header.getCellBlockMeta().getLength());
-          cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
-              this.compressionCodec, dup);
-        }
-      } catch (Throwable t) {
-        InetSocketAddress address = getListenerAddress();
-        String msg = (address != null ? address : "(channel closed)") +
-            " is unable to read call parameter from client " + getHostAddress();
-        LOG.warn(msg, t);
-
-        metrics.exception(t);
-
-        // probably the hbase hadoop version does not match the running hadoop version
-        if (t instanceof LinkageError) {
-          t = new DoNotRetryIOException(t);
-        }
-        // If the method is not present on the server, do not retry.
-        if (t instanceof UnsupportedOperationException) {
-          t = new DoNotRetryIOException(t);
-        }
-
-        final Call readParamsFailedCall =
-          new Call(id, this.service, null, null, null, null, this,
-            responder, totalRequestSize, null, null, 0, this.callCleanup);
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        setupResponse(responseBuffer, readParamsFailedCall, t,
-          msg + "; " + t.getMessage());
-        responder.doRespond(readParamsFailedCall);
-        return;
-      }
-
-      TraceInfo traceInfo = header.hasTraceInfo()
-          ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
-          : null;
-      int timeout = 0;
-      if (header.hasTimeout() && header.getTimeout() > 0){
-        timeout = Math.max(minClientRequestTimeout, header.getTimeout());
-      }
-      Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
-          totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup);
-
-      if (!scheduler.dispatch(new CallRunner(SimpleRpcServer.this, call))) {
-        callQueueSizeInBytes.add(-1 * call.getSize());
-
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
-        setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
-            "Call queue is full on " + server.getServerName() +
-                ", too many items queued ?");
-        responder.doRespond(call);
-      }
-    }
-
-    private boolean authorizeConnection() throws IOException {
-      try {
-        // If auth method is DIGEST, the token was obtained by the
-        // real user for the effective user, therefore not required to
-        // authorize real user. doAs is allowed only for simple or kerberos
-        // authentication
-        if (ugi != null && ugi.getRealUser() != null
-            && (authMethod != AuthMethod.DIGEST)) {
-          ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
-        }
-        authorize(ugi, connectionHeader, getHostInetAddress());
-        metrics.authorizationSuccess();
-      } catch (AuthorizationException ae) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
-        }
-        metrics.authorizationFailure();
-        setupResponse(authFailedResponse, authFailedCall,
-          new AccessDeniedException(ae), ae.getMessage());
-        responder.doRespond(authFailedCall);
-        return false;
-      }
-      return true;
-    }
-
     @Override
-    protected synchronized void close() {
+    public synchronized void close() {
       disposeSasl();
       data = null;
       callCleanup = null;