You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@livy.apache.org by mgaido91 <gi...@git.apache.org> on 2018/09/28 12:12:32 UTC

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

GitHub user mgaido91 opened a pull request:

    https://github.com/apache/incubator-livy/pull/117

    [WIP][LIVY-502] Remove dependency on hive-exec

    ## What changes were proposed in this pull request?
    
    This PR removes the dependency on `hive-exec`. Only modules of Hive which are used after this PR are `hive-service-rpc` and `hive-service`. This drastically reduces the amount of JARs needed by the thriftserver module.
    
    ## How was this patch tested?
    
    existing UTs + manual tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mgaido91/incubator-livy LIVY-502

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-livy/pull/117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #117
    
----
commit e7aa5b1c98e322fb60963bcca18965c5d874ce10
Author: Marco Gaido <mg...@...>
Date:   2018-09-28T12:07:38Z

    [WIP][LIVY-502] Remove dependency on hive-exec

----


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221754972
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala ---
    @@ -194,9 +193,9 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage
       @throws[HiveSQLException]
       def cancelOperation(opHandle: OperationHandle, errMsg: String): Unit = {
         val operation = getOperation(opHandle)
    -    val opState = operation.getStatus.getState
    +    val opState = operation.getStatus.state
         if (opState.isTerminal) {
    -      // Cancel should be a no-op
    +      // Cancel should be a no-op in either cases
    --- End diff --
    
    either case


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221883081
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/LivyDelegationTokenSecretManager.scala ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.{ByteArrayInputStream, DataInputStream, IOException}
    +
    +import org.apache.hadoop.io.Text
    +import org.apache.hadoop.security.token.Token
    +import org.apache.hadoop.security.token.delegation.{AbstractDelegationTokenIdentifier, AbstractDelegationTokenSecretManager}
    +
    +import org.apache.livy.LivyConf
    +
    +/**
    + * A secret manager. It is taken from analogous implementation in the MapReduce client.
    + */
    +class LivyDelegationTokenSecretManager(val livyConf: LivyConf)
    --- End diff --
    
    I don't think we can remove it. In particular, the `getTokenIdentifier` and `verifyDelegationToken` are used or the client authentication.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221887180
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.operation
    +
    +import java.sql.{DatabaseMetaData, Types}
    +
    +import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
    +
    +import org.apache.livy.Logging
    +import org.apache.livy.thriftserver.serde.ResultSet
    +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
    +
    +sealed case class TypeInfo(name: String, sqlType: Int, precision: Option[Int],
    +  caseSensitive: Boolean, searchable: Short, unsignedAttribute: Boolean, numPrecRadix: Option[Int])
    +
    +/**
    + * GetTypeInfoOperation.
    + *
    + */
    +class GetTypeInfoOperation(sessionHandle: SessionHandle)
    --- End diff --
    
    not sure bout your question:
     - If the question is: what do we need them for? They are used when answering to metadata calls by the JDBC driver;
     - If the question is: can't we reuse Hive's one? No, because Hive's classes use `HiveSession` and `HiveConf` and we need to get rid of them.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 closed the pull request at:

    https://github.com/apache/incubator-livy/pull/117


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221765558
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthBridgeServer.scala ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.security.{PrivilegedAction, PrivilegedExceptionAction}
    +import java.util
    +import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
    +import javax.security.sasl.{AuthorizeCallback, RealmCallback, SaslServer}
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.hadoop.fs.FileSystem
    +import org.apache.hadoop.security.{SaslRpcServer, UserGroupInformation}
    +import org.apache.hadoop.security.SaslRpcServer.AuthMethod
    +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
    +import org.apache.hadoop.security.token.SecretManager.InvalidToken
    +import org.apache.thrift.{TException, TProcessor}
    +import org.apache.thrift.protocol.TProtocol
    +import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
    +
    +import org.apache.livy.Logging
    +
    +/**
    + * The class is taken from Hive's `HadoopThriftAuthBridge.Server`. It bridges Thrift's SASL
    + * transports to Hadoop's SASL callback handlers and authentication classes.
    + *
    + * This class is based on Hive's one.
    + */
    +class AuthBridgeServer(private val secretManager: LivyDelegationTokenSecretManager) {
    +  private val ugi = try {
    +      UserGroupInformation.getCurrentUser
    +    } catch {
    +      case ioe: IOException => throw new TTransportException(ioe)
    +    }
    +
    +  /**
    +   * Create a TTransportFactory that, upon connection of a client socket,
    +   * negotiates a Kerberized SASL transport. The resulting TTransportFactory
    +   * can be passed as both the input and output transport factory when
    +   * instantiating a TThreadPoolServer, for example.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createTransportFactory(saslProps: util.Map[String, String]): TTransportFactory = {
    +    val transFactory: TSaslServerTransport.Factory = createSaslServerTransportFactory(saslProps)
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Create a TSaslServerTransport.Factory that, upon connection of a client
    +   * socket, negotiates a Kerberized SASL transport.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createSaslServerTransportFactory(
    +      saslProps: util.Map[String, String]): TSaslServerTransport.Factory = {
    +    // Parse out the kerberos principal, host, realm.
    +    val kerberosName: String = ugi.getUserName
    +    val names: Array[String] = SaslRpcServer.splitKerberosName(kerberosName)
    +    if (names.length != 3) {
    +      throw new TTransportException(s"Kerberos principal should have 3 parts: $kerberosName")
    +    }
    +    val transFactory: TSaslServerTransport.Factory = new TSaslServerTransport.Factory
    +    transFactory.addServerDefinition(AuthMethod.KERBEROS.getMechanismName,
    +      names(0), names(1), // two parts of kerberos principal
    +      saslProps,
    +      new SaslRpcServer.SaslGssCallbackHandler)
    +    transFactory.addServerDefinition(AuthMethod.TOKEN.getMechanismName,
    +      null,
    +      SaslRpcServer.SASL_DEFAULT_REALM,
    +      saslProps,
    +      new SaslDigestCallbackHandler(secretManager))
    +    transFactory
    +  }
    +
    +  /**
    +   * Wrap a TTransportFactory in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapTransportFactory(transFactory: TTransportFactory): TTransportFactory = {
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, true)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor to capture the client information like connecting userid, ip etc
    +   */
    +  def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, false)
    +  }
    +
    +  def getRemoteAddress: InetAddress = AuthBridgeServer.remoteAddress.get
    +
    +  def getRemoteUser: String = AuthBridgeServer.remoteUser.get
    +
    +  def getUserAuthMechanism: String = AuthBridgeServer.userAuthMechanism.get
    +
    +}
    +
    +/**
    + * A TransportFactory that wraps another one, but assumes a specified UGI
    + * before calling through.
    + *
    + * This is used on the server side to assume the server's Principal when accepting
    + * clients.
    + *
    + * This class is derived from Hive's one.
    + */
    +private[auth] class TUGIAssumingTransportFactory(
    +    val wrapped: TTransportFactory, val ugi: UserGroupInformation) extends TTransportFactory {
    +  assert(wrapped != null)
    +  assert(ugi != null)
    +
    +  override def getTransport(trans: TTransport): TTransport = {
    +    ugi.doAs(new PrivilegedAction[TTransport]() {
    +      override def run: TTransport = wrapped.getTransport(trans)
    +    })
    +  }
    +}
    +
    +/**
    + * CallbackHandler for SASL DIGEST-MD5 mechanism.
    + */
    +// This code is pretty much completely based on Hadoop's SaslRpcServer.SaslDigestCallbackHandler -
    +// the only reason we could not use that Hadoop class as-is was because it needs a
    +// Server.Connection.
    +sealed class SaslDigestCallbackHandler(
    +    val secretManager: LivyDelegationTokenSecretManager) extends CallbackHandler with Logging {
    +  @throws[InvalidToken]
    +  private def getPassword(tokenId: LivyDelegationTokenIdentifier): Array[Char] = {
    +    encodePassword(secretManager.retrievePassword(tokenId))
    +  }
    +
    +  private def encodePassword(password: Array[Byte]): Array[Char] = {
    +    new String(Base64.encodeBase64(password)).toCharArray
    +  }
    +
    +  @throws[InvalidToken]
    +  @throws[UnsupportedCallbackException]
    +  override def handle(callbacks: Array[Callback]): Unit = {
    +    var nc: NameCallback = null
    +    var pc: PasswordCallback = null
    +    callbacks.foreach {
    +      case ac: AuthorizeCallback =>
    +        val authid: String = ac.getAuthenticationID
    +        val authzid: String = ac.getAuthorizationID
    +        if (authid == authzid) ac.setAuthorized(true)
    +        else ac.setAuthorized(false)
    +        if (ac.isAuthorized) {
    +          if (logger.isDebugEnabled) {
    +            val username = SaslRpcServer.getIdentifier(authzid, secretManager).getUser.getUserName
    +            debug(s"SASL server DIGEST-MD5 callback: setting canonicalized client ID: $username")
    +          }
    +          ac.setAuthorizedID(authzid)
    +        }
    +      case c: NameCallback => nc = c
    +      case c: PasswordCallback => pc = c
    +      case _: RealmCallback => // Do nothing.
    +      case other =>
    +        throw new UnsupportedCallbackException(other, "Unrecognized SASL DIGEST-MD5 Callback")
    +    }
    +    if (pc != null) {
    +      val tokenIdentifier = SaslRpcServer.getIdentifier(nc.getDefaultName, secretManager)
    +      val password: Array[Char] = getPassword(tokenIdentifier)
    +      if (logger.isDebugEnabled) {
    +        debug("SASL server DIGEST-MD5 callback: setting password for client: " +
    +          tokenIdentifier.getUser)
    +      }
    +      pc.setPassword(password)
    +    }
    +  }
    +}
    +
    +/**
    + * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's
    + * UGI before calling through to the original processor.
    + *
    + * This is used on the server side to set the UGI for each specific call.
    --- End diff --
    
    How is this actually used in Livy?
    
    AFAICT the server is just making calls to the remote session, so it shouldn't need to impersonate the calling user on the server, right? Just check that the user can access the remote session?


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221772645
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftHttpServlet.scala ---
    @@ -0,0 +1,500 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.cli
    +
    +import java.io.IOException
    +import java.security.{PrivilegedExceptionAction, SecureRandom}
    +import javax.servlet.ServletException
    +import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}
    +import javax.ws.rs.core.NewCookie
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.codec.binary.{Base64, StringUtils}
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.authentication.util.KerberosName
    +import org.apache.hive.service.CookieSigner
    +import org.apache.hive.service.auth.{HiveAuthConstants, HttpAuthenticationException, HttpAuthUtils}
    +import org.apache.hive.service.auth.HiveAuthConstants.AuthTypes
    +import org.apache.hive.service.cli.HiveSQLException
    +import org.apache.thrift.TProcessor
    +import org.apache.thrift.protocol.TProtocolFactory
    +import org.apache.thrift.server.TServlet
    +import org.ietf.jgss.{GSSContext, GSSCredential, GSSException, GSSManager, Oid}
    +
    +import org.apache.livy.{LivyConf, Logging}
    +import org.apache.livy.thriftserver.SessionInfo
    +import org.apache.livy.thriftserver.auth.{AuthenticationProvider, AuthFactory}
    +
    +class ThriftHttpServlet(
    +    processor: TProcessor,
    +    protocolFactory: TProtocolFactory,
    +    val authType: String,
    +    val serviceUGI: UserGroupInformation,
    +    val httpUGI: UserGroupInformation,
    +    val authFactory: AuthFactory,
    +    val livyConf: LivyConf) extends TServlet(processor, protocolFactory) with Logging {
    +
    +  private val isCookieAuthEnabled = livyConf.getBoolean(LivyConf.THRIFT_HTTP_COOKIE_AUTH_ENABLED)
    +
    +  // Class members for cookie based authentication.
    +  private val signer: CookieSigner = if (isCookieAuthEnabled) {
    +      // Generate the signer with secret.
    +      val secret = ThriftHttpServlet.RAN.nextLong.toString
    +      debug("Using the random number as the secret for cookie generation " + secret)
    +      new CookieSigner(secret.getBytes())
    +    } else {
    +      null
    +    }
    +
    +  private val cookieDomain = livyConf.get(LivyConf.THRIFT_HTTP_COOKIE_DOMAIN)
    +  private val cookiePath = livyConf.get(LivyConf.THRIFT_HTTP_COOKIE_PATH)
    +  private val cookieMaxAge =
    +    (livyConf.getTimeAsMs(LivyConf.THRIFT_HTTP_COOKIE_MAX_AGE) / 1000).toInt
    +  private val isCookieSecure = livyConf.getBoolean(LivyConf.THRIFT_USE_SSL)
    +  private val isHttpOnlyCookie = livyConf.getBoolean(LivyConf.THRIFT_HTTP_COOKIE_IS_HTTPONLY)
    +  private val xsrfFilterEnabled = livyConf.getBoolean(LivyConf.THRIFT_XSRF_FILTER_ENABLED)
    +
    +  @throws[IOException]
    +  @throws[ServletException]
    +  override protected def doPost(
    +      request: HttpServletRequest, response: HttpServletResponse): Unit = {
    +    var clientUserName: String = null
    +    var requireNewCookie: Boolean = false
    +
    +    try {
    +      if (xsrfFilterEnabled) {
    +        val continueProcessing = ThriftHttpServlet.doXsrfFilter(request, response)
    +        if (!continueProcessing) {
    +          warn("Request did not have valid XSRF header, rejecting.")
    +          return
    +        }
    +      }
    +      // If the cookie based authentication is already enabled, parse the
    +      // request and validate the request cookies.
    +      if (isCookieAuthEnabled) {
    +        clientUserName = validateCookie(request)
    +        requireNewCookie = clientUserName == null
    +        if (requireNewCookie) {
    +          info("Could not validate cookie sent, will try to generate a new cookie")
    +        }
    +      }
    +      // If the cookie based authentication is not enabled or the request does
    +      // not have a valid cookie, use the kerberos or password based authentication
    +      // depending on the server setup.
    +      if (clientUserName == null) {
    +        // For a kerberos setup
    +        if (ThriftHttpServlet.isKerberosAuthMode(authType)) {
    +          val delegationToken = request.getHeader(ThriftHttpServlet.HIVE_DELEGATION_TOKEN_HEADER)
    +          // Each http request must have an Authorization header
    +          if ((delegationToken != null) && (!delegationToken.isEmpty)) {
    +            clientUserName = doTokenAuth(request, response)
    +          } else {
    +            clientUserName = doKerberosAuth(request)
    +          }
    +        } else {
    +          // For password based authentication
    +          clientUserName = doPasswdAuth(request, authType)
    +        }
    +      }
    +      debug(s"Client username: $clientUserName")
    +
    +      // Set the thread local username to be used for doAs if true
    +      SessionInfo.setUserName(clientUserName)
    +
    +      // find proxy user if any from query param
    +      val doAsQueryParam = ThriftHttpServlet.getDoAsQueryParam(request.getQueryString)
    +      if (doAsQueryParam != null) {
    +        SessionInfo.setProxyUserName(doAsQueryParam)
    +      }
    +
    +      val clientIpAddress = request.getRemoteAddr
    +      debug("Client IP Address: " + clientIpAddress)
    +      // Set the thread local ip address
    +      SessionInfo.setIpAddress(clientIpAddress)
    +
    +      // get forwarded hosts address
    +      val forwardedFor = request.getHeader(ThriftHttpServlet.X_FORWARDED_FOR)
    +      if (forwardedFor != null) {
    +        debug(s"${ThriftHttpServlet.X_FORWARDED_FOR}:$forwardedFor")
    +        SessionInfo.setForwardedAddresses(forwardedFor.split(",").toList.asJava)
    +      } else {
    +        SessionInfo.setForwardedAddresses(List.empty.asJava)
    +      }
    +
    +      // Generate new cookie and add it to the response
    +      if (requireNewCookie && !authType.equalsIgnoreCase(AuthTypes.NOSASL.toString)) {
    +        val cookieToken = HttpAuthUtils.createCookieToken(clientUserName)
    +        val hs2Cookie = createCookie(signer.signCookie(cookieToken))
    +
    +        if (isHttpOnlyCookie) {
    +          response.setHeader("SET-COOKIE", ThriftHttpServlet.getHttpOnlyCookieHeader(hs2Cookie))
    +        } else {
    +          response.addCookie(hs2Cookie)
    +        }
    +        info("Cookie added for clientUserName " + clientUserName)
    +      }
    +      super.doPost(request, response);
    +    } catch {
    +      case e: HttpAuthenticationException =>
    +        error("Error: ", e)
    +        // Send a 401 to the client
    +        response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
    +        if(ThriftHttpServlet.isKerberosAuthMode(authType)) {
    +          response.addHeader(HttpAuthUtils.WWW_AUTHENTICATE, HttpAuthUtils.NEGOTIATE)
    +        }
    +        // scalastyle:off println
    +        response.getWriter.println("Authentication Error: " + e.getMessage)
    +        // scalastyle:on println
    +    } finally {
    +      // Clear the thread locals
    +      SessionInfo.clearUserName()
    +      SessionInfo.clearIpAddress()
    +      SessionInfo.clearProxyUserName()
    +      SessionInfo.clearForwardedAddresses()
    +    }
    +  }
    +
    +  /**
    +   * Retrieves the client name from cookieString. If the cookie does not correspond to a valid
    +   * client, the function returns null.
    +   * @param cookies HTTP Request cookies.
    +   * @return Client Username if cookieString has a HS2 Generated cookie that is currently valid.
    +   *         Else, returns null.
    +   */
    +  private def getClientNameFromCookie(cookies: Array[Cookie]): String = {
    +    // Following is the main loop which iterates through all the cookies send by the client.
    +    // The HS2 generated cookies are of the format hive.server2.auth=<value>
    +    // A cookie which is identified as a hiveserver2 generated cookie is validated by calling
    +    // signer.verifyAndExtract(). If the validation passes, send the username for which the cookie
    +    // is validated to the caller. If no client side cookie passes the validation, return null to
    +    // the caller.
    +    cookies.filter(_.equals(ThriftHttpServlet.AUTH_COOKIE)).foreach { cookie =>
    +      val value = signer.verifyAndExtract(cookie.getValue)
    +      if (value != null) {
    +        val userName = HttpAuthUtils.getUserNameFromCookieToken(value)
    +        if (userName == null) {
    +          warn("Invalid cookie token " + value)
    +        } else {
    +          // We have found a valid cookie in the client request.
    +          if (logger.isDebugEnabled()) {
    +            debug("Validated the cookie for user " + userName)
    +          }
    +          return userName
    +        }
    +      }
    +    }
    +    // No valid generated cookies found, return null
    +    null
    +  }
    +
    +  /**
    +   * Convert cookie array to human readable cookie string
    +   * @param cookies Cookie Array
    +   * @return String containing all the cookies separated by a newline character.
    +   * Each cookie is of the format [key]=[value]
    +   */
    +  private def toCookieStr(cookies: Array[Cookie]): String = {
    +    cookies.map(c => s"${c.getName} = ${c.getValue} ;\n").mkString
    +  }
    +
    +  /**
    +   * Validate the request cookie. This function iterates over the request cookie headers
    +   * and finds a cookie that represents a valid client/server session. If it finds one, it
    +   * returns the client name associated with the session. Else, it returns null.
    +   * @param request The HTTP Servlet Request send by the client
    +   * @return Client Username if the request has valid HS2 cookie, else returns null
    +   */
    +  private def validateCookie(request: HttpServletRequest): String = {
    +    // Find all the valid cookies associated with the request.
    +    val cookies = request.getCookies
    +
    +    if (cookies == null) {
    +      if (logger.isDebugEnabled()) {
    +        debug("No valid cookies associated with the request " + request)
    +      }
    +      null
    +    } else {
    +      if (logger.isDebugEnabled()) {
    +        debug("Received cookies: " + toCookieStr(cookies))
    +      }
    +      getClientNameFromCookie(cookies)
    +    }
    +  }
    +
    +  /**
    +   * Generate a server side cookie given the cookie value as the input.
    +   * @param str Input string token.
    +   * @return The generated cookie.
    +   */
    +  private def createCookie(str: String): Cookie = {
    +    if (logger.isDebugEnabled()) {
    +      debug(s"Cookie name = ${ThriftHttpServlet.AUTH_COOKIE} value = $str")
    +    }
    +    val cookie = new Cookie(ThriftHttpServlet.AUTH_COOKIE, str)
    +
    +    cookie.setMaxAge(cookieMaxAge)
    +    if (cookieDomain != null) {
    +      cookie.setDomain(cookieDomain)
    +    }
    +    if (cookiePath != null) {
    +      cookie.setPath(cookiePath)
    +    }
    +    cookie.setSecure(isCookieSecure)
    +    cookie
    +  }
    +
    +
    +  /**
    +   * Do the authentication (LDAP/PAM not yet supported)
    +   */
    +  private def doPasswdAuth(request: HttpServletRequest, authType: String): String = {
    +    val userName = getUsername(request, authType)
    +    // No-op when authType is NOSASL
    +    if (!authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.toString)) {
    +      try {
    +        val provider = AuthenticationProvider.getAuthenticationProvider(authType, livyConf)
    +        provider.Authenticate(userName, getPassword(request, authType))
    +      } catch {
    +        case e: Exception => throw new HttpAuthenticationException(e)
    +      }
    +    }
    +    userName
    +  }
    +
    +  private def doTokenAuth(request: HttpServletRequest, response: HttpServletResponse): String = {
    +    val tokenStr = request.getHeader(ThriftHttpServlet.HIVE_DELEGATION_TOKEN_HEADER)
    +    try {
    +      authFactory.verifyDelegationToken(tokenStr)
    +    } catch {
    +      case e: HiveSQLException => throw new HttpAuthenticationException(e);
    +    }
    +  }
    +
    +  /**
    +   * Do the GSS-API kerberos authentication. We already have a logged in subject in the form of
    +   * serviceUGI, which GSS-API will extract information from.
    +   * In case of a SPNego request we use the httpUGI, for the authenticating service tickets.
    +   */
    +  private def doKerberosAuth(request: HttpServletRequest): String = {
    --- End diff --
    
    It would be nice at some point to check whether the `hadoop-auth` module (which provides a server filter, and is used by the main server) can replace this code.


---

[GitHub] incubator-livy issue #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/incubator-livy/pull/117
  
    Merging to master.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221771643
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala ---
    @@ -0,0 +1,745 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.cli
    +
    +import java.io.IOException
    +import java.net.{InetAddress, UnknownHostException}
    +import java.util
    +import java.util.Collections
    +import javax.security.auth.login.LoginException
    +
    +import scala.collection.JavaConverters._
    +
    +import com.google.common.base.Preconditions.checkArgument
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.authentication.util.KerberosName
    +import org.apache.hadoop.security.authorize.ProxyUsers
    +import org.apache.hadoop.util.StringUtils
    +import org.apache.hive.service.{ServiceException, ServiceUtils}
    +import org.apache.hive.service.auth.{HiveAuthConstants, TSetIpAddressProcessor}
    +import org.apache.hive.service.auth.HiveAuthConstants.AuthTypes
    +import org.apache.hive.service.cli._
    +import org.apache.hive.service.rpc.thrift._
    +import org.apache.thrift.TException
    +import org.apache.thrift.server.ServerContext
    +
    +import org.apache.livy.LivyConf
    +import org.apache.livy.thriftserver.{LivyCLIService, LivyThriftServer, SessionInfo, ThriftService}
    +import org.apache.livy.thriftserver.auth.AuthFactory
    +
    +/**
    + * This class is ported from Hive. We cannot reuse Hive's one because we need to use the
    + * `LivyCLIService`, `LivyConf` and `AuthFacotry` instead of Hive's one.
    + */
    +abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: String)
    +    extends ThriftService(serviceName) with TCLIService.Iface with Runnable {
    +
    +  def hiveAuthFactory: AuthFactory
    +
    +  protected val currentServerContext = new ThreadLocal[ServerContext]
    +  protected var portNum: Int = 0
    +  protected var serverIPAddress: InetAddress = _
    +  protected var hiveHost: String = _
    +  private var isStarted: Boolean = false
    +  protected var isEmbedded: Boolean = false
    +  protected var livyConf: LivyConf = _
    +  protected var minWorkerThreads: Int = 0
    +  protected var maxWorkerThreads: Int = 0
    +  protected var workerKeepAliveTime: Long = 0L
    +  private var serverThread: Thread = _
    +
    +  override def init(conf: LivyConf): Unit = {
    +    livyConf = conf
    +    hiveHost = livyConf.get(LivyConf.THRIFT_BIND_HOST)
    +    try {
    +      if (hiveHost == null || hiveHost.isEmpty) {
    +        serverIPAddress = InetAddress.getLocalHost
    +      } else {
    +        serverIPAddress = InetAddress.getByName(hiveHost)
    +      }
    +    } catch {
    +      case e: UnknownHostException =>
    +        throw new ServiceException(e)
    +    }
    +    portNum = livyConf.getInt(LivyConf.THRIFT_SERVER_PORT)
    +    workerKeepAliveTime = livyConf.getTimeAsMs(LivyConf.THRIFT_WORKER_KEEPALIVE_TIME) / 1000
    +    minWorkerThreads = livyConf.getInt(LivyConf.THRIFT_MIN_WORKER_THREADS)
    +    maxWorkerThreads = livyConf.getInt(LivyConf.THRIFT_MAX_WORKER_THREADS)
    +    super.init(livyConf)
    +  }
    +
    +  protected def initServer(): Unit
    +
    +  override def start(): Unit = {
    +    super.start()
    +    if (!isStarted && !isEmbedded) {
    +      initServer()
    +      serverThread = new Thread(this)
    +      serverThread.setName("Thrift Server")
    +      serverThread.start()
    +      isStarted = true
    +    }
    +  }
    +
    +  protected def stopServer(): Unit
    +
    +  override def stop(): Unit = {
    +    if (isStarted && !isEmbedded) {
    +      if (serverThread != null) {
    +        serverThread.interrupt()
    +        serverThread = null
    +      }
    +      stopServer()
    +      isStarted = false
    +    }
    +    super.stop()
    +  }
    +
    +  def getPortNumber: Int = portNum
    +
    +  def getServerIPAddress: InetAddress = serverIPAddress
    +
    +  @throws[TException]
    +  override def GetDelegationToken(req: TGetDelegationTokenReq): TGetDelegationTokenResp = {
    +    val resp: TGetDelegationTokenResp = new TGetDelegationTokenResp
    +    if (!hiveAuthFactory.isSASLKerberosUser) {
    +      resp.setStatus(unsecureTokenErrorStatus)
    +    } else {
    +      try {
    +        val token = cliService.getDelegationToken(
    +          new SessionHandle(req.getSessionHandle), hiveAuthFactory, req.getOwner, req.getRenewer)
    +        resp.setDelegationToken(token)
    +        resp.setStatus(ThriftCLIService.OK_STATUS)
    +      } catch {
    +        case e: HiveSQLException =>
    +          error("Error obtaining delegation token", e)
    +          val tokenErrorStatus = HiveSQLException.toTStatus(e)
    +          tokenErrorStatus.setSqlState("42000")
    +          resp.setStatus(tokenErrorStatus)
    +      }
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def CancelDelegationToken(req: TCancelDelegationTokenReq): TCancelDelegationTokenResp = {
    +    val resp: TCancelDelegationTokenResp = new TCancelDelegationTokenResp
    +    if (!hiveAuthFactory.isSASLKerberosUser) {
    +      resp.setStatus(unsecureTokenErrorStatus)
    +    } else {
    +      try {
    +        cliService.cancelDelegationToken(
    +          new SessionHandle(req.getSessionHandle), hiveAuthFactory, req.getDelegationToken)
    +        resp.setStatus(ThriftCLIService.OK_STATUS)
    +      } catch {
    +        case e: HiveSQLException =>
    +          error("Error canceling delegation token", e)
    +          resp.setStatus(HiveSQLException.toTStatus(e))
    +      }
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = {
    +    val resp: TRenewDelegationTokenResp = new TRenewDelegationTokenResp
    +    if (!hiveAuthFactory.isSASLKerberosUser) {
    +      resp.setStatus(unsecureTokenErrorStatus)
    +    } else {
    +      try {
    +        cliService.renewDelegationToken(
    +          new SessionHandle(req.getSessionHandle), hiveAuthFactory, req.getDelegationToken)
    +        resp.setStatus(ThriftCLIService.OK_STATUS)
    +      } catch {
    +        case e: HiveSQLException =>
    +          error("Error obtaining renewing token", e)
    +          resp.setStatus(HiveSQLException.toTStatus(e))
    +      }
    +    }
    +    resp
    +  }
    +
    +  private def unsecureTokenErrorStatus: TStatus = {
    +    val errorStatus: TStatus = new TStatus(TStatusCode.ERROR_STATUS)
    +    errorStatus.setErrorMessage(
    +      "Delegation token only supported over remote client with kerberos authentication")
    +    errorStatus
    +  }
    +
    +  @throws[TException]
    +  override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
    +    info("Client protocol version: " + req.getClient_protocol)
    +    val resp: TOpenSessionResp = new TOpenSessionResp
    +    try {
    +      val sessionHandle = getSessionHandle(req, resp)
    +      resp.setSessionHandle(sessionHandle.toTSessionHandle)
    +      val configurationMap: util.Map[String, String] = new util.HashMap[String, String]
    +      // Set the updated fetch size from the server into the configuration map for the client
    +      val defaultFetchSize =
    +        Integer.toString(livyConf.getInt(LivyConf.THRIFT_RESULTSET_DEFAULT_FETCH_SIZE))
    +      configurationMap.put(LivyConf.THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.key, defaultFetchSize)
    +      resp.setConfiguration(configurationMap)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +      Option(currentServerContext.get).foreach { context =>
    +        context.asInstanceOf[ThriftCLIServerContext].setSessionHandle(sessionHandle)
    +      }
    +    } catch {
    +      case e: Exception =>
    +        warn("Error opening session: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def SetClientInfo(req: TSetClientInfoReq): TSetClientInfoResp = {
    +    // TODO: We don't do anything for now, just log this for debugging.
    +    //       We may be able to make use of this later, e.g. for workload management.
    +    if (req.isSetConfiguration) {
    +      val sh = new SessionHandle(req.getSessionHandle)
    +      val sb = new StringBuilder("Client information for ").append(sh).append(": ")
    +
    +      def processEntry(e: util.Map.Entry[String, String]): Unit = {
    +        sb.append(e.getKey).append(" = ").append(e.getValue)
    +        if ("ApplicationName" == e.getKey) {
    +          cliService.setApplicationName(sh, e.getValue)
    +        }
    +      }
    +
    +      val entries = req.getConfiguration.entrySet.asScala.toSeq
    +      try {
    +        entries.headOption.foreach(processEntry)
    +        entries.tail.foreach { e =>
    +          sb.append(", ")
    +          processEntry(e)
    +        }
    +      } catch {
    +        case ex: Exception =>
    +          warn("Error setting application name", ex)
    +          return new TSetClientInfoResp(HiveSQLException.toTStatus(ex))
    +      }
    +      info(sb.toString())
    +    }
    +    new TSetClientInfoResp(ThriftCLIService.OK_STATUS)
    +  }
    +
    +  private def getIpAddress: String = {
    +    // Http transport mode.
    +    // We set the thread local ip address, in ThriftHttpServlet.
    +    val clientIpAddress = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
    +      SessionInfo.getIpAddress
    +    } else if (hiveAuthFactory.isSASLWithKerberizedHadoop) {
    +      hiveAuthFactory.getIpAddress
    +    } else {
    +      // NOSASL
    +      TSetIpAddressProcessor.getUserIpAddress
    +    }
    +    debug(s"Client's IP Address: $clientIpAddress")
    +    clientIpAddress
    +  }
    +
    +  /**
    +   * Returns the effective username.
    +   * 1. If livy.server.thrift.allow.user.substitution = false: the username of the connecting user
    +   * 2. If livy.server.thrift.allow.user.substitution = true: the username of the end user,
    +   * that the connecting user is trying to proxy for.
    +   * This includes a check whether the connecting user is allowed to proxy for the end user.
    +   */
    +  @throws[HiveSQLException]
    +  @throws[IOException]
    +  private def getUserName(req: TOpenSessionReq): String = {
    +    val username = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
    +      Option(SessionInfo.getUserName).getOrElse(req.getUsername)
    +    } else if (hiveAuthFactory.isSASLWithKerberizedHadoop) {
    +      Option(hiveAuthFactory.getRemoteUser).orElse(Option(TSetIpAddressProcessor.getUserName))
    +        .getOrElse(req.getUsername)
    +    } else {
    +      Option(TSetIpAddressProcessor.getUserName).getOrElse(req.getUsername)
    +    }
    +    val effectiveClientUser =
    +      getProxyUser(getShortName(username), req.getConfiguration, getIpAddress)
    +    debug(s"Client's username: $effectiveClientUser")
    +    effectiveClientUser
    +  }
    +
    +  @throws[IOException]
    +  private def getShortName(userName: String): String = {
    +    Option(userName).map { un =>
    +      if (hiveAuthFactory.isSASLKerberosUser) {
    +        // KerberosName.getShorName can only be used for kerberos user
    +        new KerberosName(un).getShortName
    +      } else {
    +        val indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(un)
    +        if (indexOfDomainMatch <= 0) {
    +          un
    +        } else {
    +          un.substring(0, indexOfDomainMatch)
    +        }
    +      }
    +    }.orNull
    +  }
    +
    +  /**
    +   * Create a session handle
    +   */
    +  @throws[HiveSQLException]
    +  @throws[LoginException]
    +  @throws[IOException]
    +  private[thriftserver] def getSessionHandle(
    +      req: TOpenSessionReq, res: TOpenSessionResp): SessionHandle = {
    +    val userName = getUserName(req)
    +    val ipAddress = getIpAddress
    +    val protocol = getMinVersion(LivyCLIService.SERVER_VERSION, req.getClient_protocol)
    +    val sessionHandle =
    +      if (livyConf.getBoolean(LivyConf.THRIFT_ENABLE_DOAS) && (userName != null)) {
    +        cliService.openSessionWithImpersonation(
    +          protocol, userName, req.getPassword, ipAddress, req.getConfiguration, null)
    +      } else {
    +        cliService.openSession(protocol, userName, req.getPassword, ipAddress, req.getConfiguration)
    +      }
    +    res.setServerProtocolVersion(protocol)
    +    sessionHandle
    +  }
    +
    +  @throws[HiveSQLException]
    +  private def getProgressedPercentage(opHandle: OperationHandle): Double = {
    +    checkArgument(OperationType.EXECUTE_STATEMENT == opHandle.getOperationType)
    +    0.0
    +  }
    +
    +  private def getMinVersion(versions: TProtocolVersion*): TProtocolVersion = {
    +    val values = TProtocolVersion.values
    +    var current = values(values.length - 1).getValue
    +    versions.foreach { version =>
    +      if (current > version.getValue) {
    +        current = version.getValue
    +      }
    +    }
    +    val res = values.find(_.getValue == current)
    +    assert(res.isDefined)
    +    res.get
    +  }
    +
    +  @throws[TException]
    +  override def CloseSession(req: TCloseSessionReq): TCloseSessionResp = {
    +    val resp = new TCloseSessionResp
    +    try {
    +      val sessionHandle = new SessionHandle(req.getSessionHandle)
    +      cliService.closeSession(sessionHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +      Option(currentServerContext.get).foreach { ctx =>
    +        ctx.asInstanceOf[ThriftCLIServerContext].setSessionHandle(null)
    +      }
    +    } catch {
    +      case e: Exception =>
    +        warn("Error closing session: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetInfo(req: TGetInfoReq): TGetInfoResp = {
    +    val resp = new TGetInfoResp
    +    try {
    +      val getInfoValue = cliService.getInfo(
    +        new SessionHandle(req.getSessionHandle), GetInfoType.getGetInfoType(req.getInfoType))
    +      resp.setInfoValue(getInfoValue.toTGetInfoValue)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting info: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = {
    +    val resp = new TExecuteStatementResp
    +    try {
    +      val sessionHandle = new SessionHandle(req.getSessionHandle)
    +      val statement = req.getStatement
    +      val confOverlay = req.getConfOverlay
    +      val runAsync = req.isRunAsync
    +      val queryTimeout = req.getQueryTimeout
    +      val operationHandle = if (runAsync) {
    +          cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout)
    +        } else {
    +          cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout)
    +        }
    +      resp.setOperationHandle(operationHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error executing statement: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetTypeInfo(req: TGetTypeInfoReq): TGetTypeInfoResp = {
    +    val resp = new TGetTypeInfoResp
    +    try {
    +      val operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle))
    +      resp.setOperationHandle(operationHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting type info: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetCatalogs(req: TGetCatalogsReq): TGetCatalogsResp = {
    +    val resp = new TGetCatalogsResp
    +    try {
    +      val opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle))
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting catalogs: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = {
    +    val resp = new TGetSchemasResp
    +    try {
    +      val opHandle = cliService.getSchemas(
    +        new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting schemas: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetTables(req: TGetTablesReq): TGetTablesResp = {
    +    val resp = new TGetTablesResp
    +    try {
    +      val opHandle = cliService.getTables(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getTableName,
    +        req.getTableTypes)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting tables: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetTableTypes(req: TGetTableTypesReq): TGetTableTypesResp = {
    +    val resp = new TGetTableTypesResp
    +    try {
    +      val opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle))
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting table types: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetColumns(req: TGetColumnsReq): TGetColumnsResp = {
    +    val resp = new TGetColumnsResp
    +    try {
    +      val opHandle = cliService.getColumns(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getTableName,
    +        req.getColumnName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting columns: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetFunctions(req: TGetFunctionsReq): TGetFunctionsResp = {
    +    val resp = new TGetFunctionsResp
    +    try {
    +      val opHandle = cliService.getFunctions(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getFunctionName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting functions: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetOperationStatus(req: TGetOperationStatusReq): TGetOperationStatusResp = {
    +    val resp = new TGetOperationStatusResp
    +    val operationHandle = new OperationHandle(req.getOperationHandle)
    +    try {
    +      val operationStatus = cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate)
    +      resp.setOperationState(operationStatus.state.toTOperationState)
    +      resp.setErrorMessage(operationStatus.state.getErrorMessage)
    +      val opException = operationStatus.operationException
    +      resp.setOperationStarted(operationStatus.operationStarted)
    +      resp.setOperationCompleted(operationStatus.operationCompleted)
    +      resp.setHasResultSet(operationStatus.hasResultSet)
    +      val executionStatus = TJobExecutionStatus.NOT_AVAILABLE
    +      resp.setProgressUpdateResponse(new TProgressUpdateResp(
    +        Collections.emptyList[String],
    +        Collections.emptyList[util.List[String]],
    +        0.0D,
    +        executionStatus,
    +        "",
    +        0L))
    +      if (opException != null) {
    +        resp.setSqlState(opException.getSQLState)
    +        resp.setErrorCode(opException.getErrorCode)
    +        if (opException.getErrorCode == 29999) {
    +          resp.setErrorMessage(StringUtils.stringifyException(opException))
    +        } else {
    +          resp.setErrorMessage(opException.getMessage)
    +        }
    +      } else if (OperationType.EXECUTE_STATEMENT == operationHandle.getOperationType) {
    +        resp.getProgressUpdateResponse.setProgressedPercentage(
    +          getProgressedPercentage(operationHandle))
    +      }
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting operation status: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def CancelOperation(req: TCancelOperationReq): TCancelOperationResp = {
    +    val resp = new TCancelOperationResp
    +    try {
    +      cliService.cancelOperation(new OperationHandle(req.getOperationHandle))
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error cancelling operation: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def CloseOperation(req: TCloseOperationReq): TCloseOperationResp = {
    +    val resp = new TCloseOperationResp
    +    try {
    +      cliService.closeOperation(new OperationHandle(req.getOperationHandle))
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error closing operation: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetResultSetMetadata(req: TGetResultSetMetadataReq): TGetResultSetMetadataResp = {
    +    val resp = new TGetResultSetMetadataResp
    +    try {
    +      val schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle))
    +      resp.setSchema(schema.toTTableSchema)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting result set metadata: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
    +    val resp = new TFetchResultsResp
    +    try {
    +      // Set fetch size
    +      val maxFetchSize = livyConf.getInt(LivyConf.THRIFT_RESULTSET_MAX_FETCH_SIZE)
    +      if (req.getMaxRows > maxFetchSize) {
    +        req.setMaxRows(maxFetchSize)
    +      }
    +      val rowSet = cliService.fetchResults(
    +        new OperationHandle(req.getOperationHandle),
    +        FetchOrientation.getFetchOrientation(req.getOrientation),
    +        req.getMaxRows,
    +        FetchType.getFetchType(req.getFetchType))
    +      resp.setResults(rowSet.toTRowSet)
    +      resp.setHasMoreRows(false)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error fetching results: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetPrimaryKeys(req: TGetPrimaryKeysReq): TGetPrimaryKeysResp = {
    +    val resp = new TGetPrimaryKeysResp
    +    try {
    +      val opHandle = cliService.getPrimaryKeys(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getTableName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting functions: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetCrossReference(req: TGetCrossReferenceReq): TGetCrossReferenceResp = {
    +    val resp = new TGetCrossReferenceResp
    +    try {
    +      val opHandle = cliService.getCrossReference(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getParentCatalogName,
    +        req.getParentSchemaName,
    +        req.getParentTableName,
    +        req.getForeignCatalogName,
    +        req.getForeignSchemaName,
    +        req.getForeignTableName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting functions: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetQueryId(req: TGetQueryIdReq): TGetQueryIdResp = {
    +    try {
    +      new TGetQueryIdResp(cliService.getQueryId(req.getOperationHandle))
    +    } catch {
    +      case e: HiveSQLException => throw new TException(e)
    +    }
    +  }
    +
    +  override def run(): Unit
    +
    +  /**
    +   * If the proxy user name is provided then check privileges to substitute the user.
    +   */
    +  @throws[HiveSQLException]
    +  private def getProxyUser(
    --- End diff --
    
    This is another area where I think things should eventually be merge with the main server code. e.g. checking `AccessManager.checkImpersonation` instead of the custom configs + `verifyProxyAccess` being used here.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221755078
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala ---
    @@ -233,12 +232,5 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage
     }
     
     object LivyOperationManager {
    - val LOG_SCHEMA: Schema = {
    -    val schema = new Schema
    -    val fieldSchema = new FieldSchema
    -    fieldSchema.setName("operation_log")
    -    fieldSchema.setType("string")
    -    schema.addToFieldSchemas(fieldSchema)
    -    schema
    -  }
    + val LOG_SCHEMA: Array[DataType] = Array(BasicDataType("string"))
    --- End diff --
    
    Since you're touching this, indentation looks wrong.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237313919
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala ---
    @@ -22,60 +22,57 @@ import java.util
     import java.util.concurrent.{CancellationException, ExecutionException, TimeoutException, TimeUnit}
     import javax.security.auth.login.LoginException
     
    -import scala.collection.JavaConverters._
    -
    -import org.apache.hadoop.hive.common.log.ProgressMonitor
    -import org.apache.hadoop.hive.conf.HiveConf
    -import org.apache.hadoop.hive.conf.HiveConf.ConfVars
    -import org.apache.hadoop.hive.ql.parse.ParseUtils
    -import org.apache.hadoop.hive.shims.Utils
    -import org.apache.hadoop.security.UserGroupInformation
    -import org.apache.hive.service.{CompositeService, ServiceException}
    -import org.apache.hive.service.auth.HiveAuthFactory
    -import org.apache.hive.service.cli._
    -import org.apache.hive.service.cli.operation.Operation
    +import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
    +import org.apache.hive.service.ServiceException
    +import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType, GetInfoValue, HiveSQLException, OperationHandle, SessionHandle}
    --- End diff --
    
    Just use a wildcard for long import lists...


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237317373
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthFactory.scala ---
    @@ -0,0 +1,197 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.IOException
    +import java.util
    +import javax.security.auth.callback._
    +import javax.security.auth.login.LoginException
    +import javax.security.sasl.{AuthorizeCallback, Sasl}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
    +import org.apache.hadoop.security.SaslRpcServer.AuthMethod
    +import org.apache.hive.service.auth.{SaslQOP, TSetIpAddressProcessor}
    +import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods
    +import org.apache.hive.service.auth.HiveAuthConstants.AuthTypes
    +import org.apache.hive.service.cli.HiveSQLException
    +import org.apache.hive.service.rpc.thrift.TCLIService
    +import org.apache.hive.service.rpc.thrift.TCLIService.Iface
    +import org.apache.thrift.{TProcessor, TProcessorFactory}
    +import org.apache.thrift.transport.{TTransport, TTransportException, TTransportFactory}
    +
    +import org.apache.livy.{LivyConf, Logging}
    +import org.apache.livy.thriftserver.cli.ThriftCLIService
    +
    +/**
    + * This class is a porting of the parts we use from `HiveAuthFactory` by Hive.
    + */
    +class AuthFactory(val conf: LivyConf) extends Logging {
    +
    +  private val authTypeStr = conf.get(LivyConf.THRIFT_AUTHENTICATION)
    +  // ShimLoader.getHadoopShims().isSecurityEnabled() will only check that
    +  // hadoopAuth is not simple, it does not guarantee it is kerberos
    +  private val hadoopAuth = new Configuration().get(HADOOP_SECURITY_AUTHENTICATION)
    +
    +  private val secretManager = if (isSASLWithKerberizedHadoop) {
    +      val sm = new LivyDelegationTokenSecretManager(conf)
    +      try {
    +        sm.startThreads()
    +      } catch {
    +        case e: IOException =>
    +          throw new TTransportException("Failed to start token manager", e)
    +      }
    +      Some(sm)
    +    } else {
    +      None
    +    }
    +
    +  private val saslServer: Option[AuthBridgeServer] = secretManager.map { sm =>
    +      new AuthBridgeServer(sm)
    +    }
    +
    +  def getSaslProperties: util.Map[String, String] = {
    +    val saslProps = new util.HashMap[String, String]
    +    val saslQOP = SaslQOP.fromString(conf.get(LivyConf.THRIFT_SASL_QOP))
    +    saslProps.put(Sasl.QOP, saslQOP.toString)
    +    saslProps.put(Sasl.SERVER_AUTH, "true")
    +    saslProps
    +  }
    +
    +  @throws[LoginException]
    +  def getAuthTransFactory: TTransportFactory = {
    +    val isAuthKerberos = authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName)
    +    val isAuthNoSASL = authTypeStr.equalsIgnoreCase(AuthTypes.NOSASL.getAuthName)
    +    // TODO: add LDAP and PAM when supported
    +    val isAuthOther = authTypeStr.equalsIgnoreCase(AuthTypes.NONE.getAuthName) ||
    +      authTypeStr.equalsIgnoreCase(AuthTypes.CUSTOM.getAuthName)
    +
    --- End diff --
    
    too many empty lines


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221885113
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftHttpCLIService.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.cli
    +
    +import java.util.concurrent.SynchronousQueue
    +import java.util.concurrent.TimeUnit
    +import javax.ws.rs.HttpMethod
    +
    +import org.apache.hive.service.rpc.thrift.TCLIService
    +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup
    +import org.apache.thrift.protocol.TBinaryProtocol
    +import org.eclipse.jetty.server.HttpConfiguration
    +import org.eclipse.jetty.server.HttpConnectionFactory
    +import org.eclipse.jetty.server.Server
    +import org.eclipse.jetty.server.ServerConnector
    +import org.eclipse.jetty.server.handler.gzip.GzipHandler
    +import org.eclipse.jetty.servlet.ServletContextHandler
    +import org.eclipse.jetty.servlet.ServletHolder
    +import org.eclipse.jetty.util.ssl.SslContextFactory
    +import org.eclipse.jetty.util.thread.ExecutorThreadPool
    +
    +import org.apache.livy.LivyConf
    +import org.apache.livy.thriftserver.LivyCLIService
    +import org.apache.livy.thriftserver.auth.AuthFactory
    +
    +/**
    + * This class is ported from Hive. We cannot reuse Hive's one because we need to use the
    + * `LivyCLIService`, `LivyConf` and `AuthFacotry` instead of Hive's one.
    + */
    +class ThriftHttpCLIService(
    --- End diff --
    
    I am not sure about this. Here we have a dedicated thread-pool associated with this which is separate from Livy's HTTP server. I am not sure it is feasible. We'd need to try.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237319368
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.operation
    +
    +import java.sql.{DatabaseMetaData, Types}
    +
    +import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
    +
    +import org.apache.livy.Logging
    +import org.apache.livy.thriftserver.serde.ResultSet
    +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
    +
    +sealed case class TypeInfo(name: String, sqlType: Int, precision: Option[Int],
    +  caseSensitive: Boolean, searchable: Short, unsignedAttribute: Boolean, numPrecRadix: Option[Int])
    +
    +/**
    + * GetTypeInfoOperation.
    --- End diff --
    
    Same.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237423913
  
    --- Diff: server/src/main/scala/org/apache/livy/LivyConf.scala ---
    @@ -98,10 +98,78 @@ object LivyConf {
       val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h")
       val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5)
     
    +  // Thrift configurations
       val THRIFT_SERVER_ENABLED = Entry("livy.server.thrift.enabled", false)
       val THRIFT_INCR_COLLECT_ENABLED = Entry("livy.server.thrift.incrementalCollect", false)
       val THRIFT_SESSION_CREATION_TIMEOUT = Entry("livy.server.thrift.session.creationTimeout", "10m")
       val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", null)
    +  // The following configs are the same present in Hive
    +  val THRIFT_RESULTSET_DEFAULT_FETCH_SIZE =
    +    Entry("livy.server.thrift.resultset.default.fetch.size", 1000)
    +  val THRIFT_SPNEGO_PRINCIPAL = Entry("livy.server.thrift.authentication.spnego.principal", "")
    --- End diff --
    
    yes, I thought about that too but I was not sure, so I kept this. Let me remove it and replace it with `AUTH_KERBEROS_PRINCIPAL` then. Thanks.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221768243
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/LivyDelegationTokenSecretManager.scala ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.{ByteArrayInputStream, DataInputStream, IOException}
    +
    +import org.apache.hadoop.io.Text
    +import org.apache.hadoop.security.token.Token
    +import org.apache.hadoop.security.token.delegation.{AbstractDelegationTokenIdentifier, AbstractDelegationTokenSecretManager}
    +
    +import org.apache.livy.LivyConf
    +
    +/**
    + * A secret manager. It is taken from analogous implementation in the MapReduce client.
    + */
    +class LivyDelegationTokenSecretManager(val livyConf: LivyConf)
    --- End diff --
    
    Do you need this stuff? Following my previous comment, I think this would make sense if the main Livy server also supported delegation tokens.
    
    Otherwise it seems this could just be removed from Livy? i.e. you can authenticate using plain kerberos (= you have a TGT) but not with a delegation token.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221771929
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftHttpCLIService.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.cli
    +
    +import java.util.concurrent.SynchronousQueue
    +import java.util.concurrent.TimeUnit
    +import javax.ws.rs.HttpMethod
    +
    +import org.apache.hive.service.rpc.thrift.TCLIService
    +import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup
    +import org.apache.thrift.protocol.TBinaryProtocol
    +import org.eclipse.jetty.server.HttpConfiguration
    +import org.eclipse.jetty.server.HttpConnectionFactory
    +import org.eclipse.jetty.server.Server
    +import org.eclipse.jetty.server.ServerConnector
    +import org.eclipse.jetty.server.handler.gzip.GzipHandler
    +import org.eclipse.jetty.servlet.ServletContextHandler
    +import org.eclipse.jetty.servlet.ServletHolder
    +import org.eclipse.jetty.util.ssl.SslContextFactory
    +import org.eclipse.jetty.util.thread.ExecutorThreadPool
    +
    +import org.apache.livy.LivyConf
    +import org.apache.livy.thriftserver.LivyCLIService
    +import org.apache.livy.thriftserver.auth.AuthFactory
    +
    +/**
    + * This class is ported from Hive. We cannot reuse Hive's one because we need to use the
    + * `LivyCLIService`, `LivyConf` and `AuthFacotry` instead of Hive's one.
    + */
    +class ThriftHttpCLIService(
    --- End diff --
    
    One thing that might be nice is to just mount this under a namespace in the main Livy HTTP server. But not sure whether Hive clients support that (or require the server to be on "/").


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237314382
  
    --- Diff: server/src/main/scala/org/apache/livy/LivyConf.scala ---
    @@ -98,10 +98,78 @@ object LivyConf {
       val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h")
       val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5)
     
    +  // Thrift configurations
       val THRIFT_SERVER_ENABLED = Entry("livy.server.thrift.enabled", false)
       val THRIFT_INCR_COLLECT_ENABLED = Entry("livy.server.thrift.incrementalCollect", false)
       val THRIFT_SESSION_CREATION_TIMEOUT = Entry("livy.server.thrift.session.creationTimeout", "10m")
       val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", null)
    +  // The following configs are the same present in Hive
    +  val THRIFT_RESULTSET_DEFAULT_FETCH_SIZE =
    +    Entry("livy.server.thrift.resultset.default.fetch.size", 1000)
    +  val THRIFT_SPNEGO_PRINCIPAL = Entry("livy.server.thrift.authentication.spnego.principal", "")
    --- End diff --
    
    Won't this be exactly the same as `AUTH_KERBEROS_PRINCIPAL`?
    
    The spnego principal must be `HTTP`, so it's not like they can be different.
    
    Which also means the keytab config will have to match, too.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237316485
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthBridgeServer.scala ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.security.{PrivilegedAction, PrivilegedExceptionAction}
    +import java.util
    +import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
    +import javax.security.sasl.{AuthorizeCallback, RealmCallback, SaslServer}
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.hadoop.fs.FileSystem
    +import org.apache.hadoop.security.{SaslRpcServer, UserGroupInformation}
    +import org.apache.hadoop.security.SaslRpcServer.AuthMethod
    +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
    +import org.apache.hadoop.security.token.SecretManager.InvalidToken
    +import org.apache.thrift.{TException, TProcessor}
    +import org.apache.thrift.protocol.TProtocol
    +import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
    +
    +import org.apache.livy.Logging
    +
    +/**
    + * The class is taken from Hive's `HadoopThriftAuthBridge.Server`. It bridges Thrift's SASL
    + * transports to Hadoop's SASL callback handlers and authentication classes.
    + *
    + * This class is based on Hive's one.
    --- End diff --
    
    Redundant comment.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237315238
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala ---
    @@ -114,24 +98,56 @@ object LivyThriftServer extends Logging {
         thriftServer.stop()
         thriftServer = null
       }
    +
    +  def isHTTPTransportMode(livyConf: LivyConf): Boolean = {
    +    val transportMode = livyConf.get(LivyConf.THRIFT_TRANSPORT_MODE)
    +    transportMode != null && transportMode.equalsIgnoreCase("http")
    +  }
     }
     
     
     class LivyThriftServer(
         private[thriftserver] val livyConf: LivyConf,
         private[thriftserver] val livySessionManager: InteractiveSessionManager,
         private[thriftserver] val sessionStore: SessionStore,
    -    private[thriftserver] val accessManager: AccessManager) extends HiveServer2 {
    -  override def init(hiveConf: HiveConf): Unit = {
    -    this.cliService = new LivyCLIService(this)
    -    super.init(hiveConf)
    +    private[thriftserver] val accessManager: AccessManager)
    +  extends ThriftService(classOf[LivyThriftServer].getName) with Logging {
    +
    +  val cliService = new LivyCLIService(this)
    +
    +  override def init(livyConf: LivyConf): Unit = {
    +    addService(cliService)
    +    val server = this
    +    val oomHook = new Runnable() {
    +      override def run(): Unit = {
    +        server.stop()
    +      }
    +    }
    +    val thriftCLIService = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
    +      new ThriftHttpCLIService(cliService, oomHook)
    +    } else {
    +      new ThriftBinaryCLIService(cliService, oomHook)
    +    }
    +    addService(thriftCLIService)
    +    super.init(livyConf)
    +    Runtime.getRuntime.addShutdownHook(new Thread("LivyThriftServer Shutdown") {
    --- End diff --
    
    Is this necessary? Seems like if needed, it's something that the main server class should handle (and then shut down all loaded plugins).


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237422303
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala ---
    @@ -114,24 +98,56 @@ object LivyThriftServer extends Logging {
         thriftServer.stop()
         thriftServer = null
       }
    +
    +  def isHTTPTransportMode(livyConf: LivyConf): Boolean = {
    +    val transportMode = livyConf.get(LivyConf.THRIFT_TRANSPORT_MODE)
    +    transportMode != null && transportMode.equalsIgnoreCase("http")
    +  }
     }
     
     
     class LivyThriftServer(
         private[thriftserver] val livyConf: LivyConf,
         private[thriftserver] val livySessionManager: InteractiveSessionManager,
         private[thriftserver] val sessionStore: SessionStore,
    -    private[thriftserver] val accessManager: AccessManager) extends HiveServer2 {
    -  override def init(hiveConf: HiveConf): Unit = {
    -    this.cliService = new LivyCLIService(this)
    -    super.init(hiveConf)
    +    private[thriftserver] val accessManager: AccessManager)
    +  extends ThriftService(classOf[LivyThriftServer].getName) with Logging {
    +
    +  val cliService = new LivyCLIService(this)
    +
    +  override def init(livyConf: LivyConf): Unit = {
    +    addService(cliService)
    +    val server = this
    +    val oomHook = new Runnable() {
    +      override def run(): Unit = {
    +        server.stop()
    +      }
    +    }
    +    val thriftCLIService = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
    +      new ThriftHttpCLIService(cliService, oomHook)
    +    } else {
    +      new ThriftBinaryCLIService(cliService, oomHook)
    +    }
    +    addService(thriftCLIService)
    +    super.init(livyConf)
    +    Runtime.getRuntime.addShutdownHook(new Thread("LivyThriftServer Shutdown") {
    --- End diff --
    
    yes, I moved it in the main server class, thanks.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237318955
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetCatalogsOperation.scala ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.operation
    +
    +import org.apache.hive.service.cli._
    +
    +import org.apache.livy.Logging
    +import org.apache.livy.thriftserver.serde.ResultSet
    +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
    +
    +/**
    + * GetCatalogsOperation.
    --- End diff --
    
    Not a super useful doc.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221764846
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthBridgeServer.scala ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.security.{PrivilegedAction, PrivilegedExceptionAction}
    +import java.util
    +import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
    +import javax.security.sasl.{AuthorizeCallback, RealmCallback, SaslServer}
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.hadoop.fs.FileSystem
    +import org.apache.hadoop.security.{SaslRpcServer, UserGroupInformation}
    +import org.apache.hadoop.security.SaslRpcServer.AuthMethod
    +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
    +import org.apache.hadoop.security.token.SecretManager.InvalidToken
    +import org.apache.thrift.{TException, TProcessor}
    +import org.apache.thrift.protocol.TProtocol
    +import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
    +
    +import org.apache.livy.Logging
    +
    +/**
    + * The class is taken from Hive's `HadoopThriftAuthBridge.Server`. It bridges Thrift's SASL
    + * transports to Hadoop's SASL callback handlers and authentication classes.
    + *
    + * This class is based on Hive's one.
    + */
    +class AuthBridgeServer(private val secretManager: LivyDelegationTokenSecretManager) {
    +  private val ugi = try {
    +      UserGroupInformation.getCurrentUser
    +    } catch {
    +      case ioe: IOException => throw new TTransportException(ioe)
    +    }
    +
    +  /**
    +   * Create a TTransportFactory that, upon connection of a client socket,
    +   * negotiates a Kerberized SASL transport. The resulting TTransportFactory
    +   * can be passed as both the input and output transport factory when
    +   * instantiating a TThreadPoolServer, for example.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createTransportFactory(saslProps: util.Map[String, String]): TTransportFactory = {
    +    val transFactory: TSaslServerTransport.Factory = createSaslServerTransportFactory(saslProps)
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Create a TSaslServerTransport.Factory that, upon connection of a client
    +   * socket, negotiates a Kerberized SASL transport.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createSaslServerTransportFactory(
    +      saslProps: util.Map[String, String]): TSaslServerTransport.Factory = {
    +    // Parse out the kerberos principal, host, realm.
    +    val kerberosName: String = ugi.getUserName
    +    val names: Array[String] = SaslRpcServer.splitKerberosName(kerberosName)
    +    if (names.length != 3) {
    +      throw new TTransportException(s"Kerberos principal should have 3 parts: $kerberosName")
    +    }
    +    val transFactory: TSaslServerTransport.Factory = new TSaslServerTransport.Factory
    +    transFactory.addServerDefinition(AuthMethod.KERBEROS.getMechanismName,
    +      names(0), names(1), // two parts of kerberos principal
    +      saslProps,
    +      new SaslRpcServer.SaslGssCallbackHandler)
    +    transFactory.addServerDefinition(AuthMethod.TOKEN.getMechanismName,
    +      null,
    +      SaslRpcServer.SASL_DEFAULT_REALM,
    +      saslProps,
    +      new SaslDigestCallbackHandler(secretManager))
    +    transFactory
    +  }
    +
    +  /**
    +   * Wrap a TTransportFactory in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapTransportFactory(transFactory: TTransportFactory): TTransportFactory = {
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, true)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor to capture the client information like connecting userid, ip etc
    +   */
    +  def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, false)
    +  }
    +
    +  def getRemoteAddress: InetAddress = AuthBridgeServer.remoteAddress.get
    +
    +  def getRemoteUser: String = AuthBridgeServer.remoteUser.get
    +
    +  def getUserAuthMechanism: String = AuthBridgeServer.userAuthMechanism.get
    +
    +}
    +
    +/**
    + * A TransportFactory that wraps another one, but assumes a specified UGI
    + * before calling through.
    + *
    + * This is used on the server side to assume the server's Principal when accepting
    + * clients.
    + *
    + * This class is derived from Hive's one.
    + */
    +private[auth] class TUGIAssumingTransportFactory(
    +    val wrapped: TTransportFactory, val ugi: UserGroupInformation) extends TTransportFactory {
    +  assert(wrapped != null)
    +  assert(ugi != null)
    +
    +  override def getTransport(trans: TTransport): TTransport = {
    +    ugi.doAs(new PrivilegedAction[TTransport]() {
    +      override def run: TTransport = wrapped.getTransport(trans)
    +    })
    +  }
    +}
    +
    +/**
    + * CallbackHandler for SASL DIGEST-MD5 mechanism.
    + */
    +// This code is pretty much completely based on Hadoop's SaslRpcServer.SaslDigestCallbackHandler -
    +// the only reason we could not use that Hadoop class as-is was because it needs a
    +// Server.Connection.
    +sealed class SaslDigestCallbackHandler(
    +    val secretManager: LivyDelegationTokenSecretManager) extends CallbackHandler with Logging {
    +  @throws[InvalidToken]
    +  private def getPassword(tokenId: LivyDelegationTokenIdentifier): Array[Char] = {
    +    encodePassword(secretManager.retrievePassword(tokenId))
    +  }
    +
    +  private def encodePassword(password: Array[Byte]): Array[Char] = {
    +    new String(Base64.encodeBase64(password)).toCharArray
    +  }
    +
    +  @throws[InvalidToken]
    +  @throws[UnsupportedCallbackException]
    +  override def handle(callbacks: Array[Callback]): Unit = {
    +    var nc: NameCallback = null
    +    var pc: PasswordCallback = null
    +    callbacks.foreach {
    +      case ac: AuthorizeCallback =>
    +        val authid: String = ac.getAuthenticationID
    +        val authzid: String = ac.getAuthorizationID
    +        if (authid == authzid) ac.setAuthorized(true)
    --- End diff --
    
    Braces make this easier to read.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237416944
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/PlainSaslServer.scala ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.livy.thriftserver.auth
    +
    +
    +import java.io.IOException
    +import java.security.{Provider, Security}
    +import java.util
    +import javax.security.auth.callback.Callback
    +import javax.security.auth.callback.CallbackHandler
    +import javax.security.auth.callback.NameCallback
    +import javax.security.auth.callback.PasswordCallback
    +import javax.security.auth.callback.UnsupportedCallbackException
    +import javax.security.auth.login.LoginException
    +import javax.security.sasl._
    +
    +import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods
    +import org.apache.thrift.transport.TSaslServerTransport
    +
    +import org.apache.livy.LivyConf
    +
    +
    +/**
    + * Sun JDK only provides a PLAIN client and no server. This class implements the Plain SASL server
    + * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt).
    + */
    +class PlainSaslServer private[auth] (
    --- End diff --
    
    I think you answered yourself to this comment in https://github.com/apache/incubator-livy/pull/117#discussion_r237319439. In this case, the Hive classes implemented here (see PlainSaslHelper) depend on `ThriftCLIService` which depends on `HiveConf`. Moreover some of them are final, so it is not easy to override them.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221890669
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthBridgeServer.scala ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.security.{PrivilegedAction, PrivilegedExceptionAction}
    +import java.util
    +import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
    +import javax.security.sasl.{AuthorizeCallback, RealmCallback, SaslServer}
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.hadoop.fs.FileSystem
    +import org.apache.hadoop.security.{SaslRpcServer, UserGroupInformation}
    +import org.apache.hadoop.security.SaslRpcServer.AuthMethod
    +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
    +import org.apache.hadoop.security.token.SecretManager.InvalidToken
    +import org.apache.thrift.{TException, TProcessor}
    +import org.apache.thrift.protocol.TProtocol
    +import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
    +
    +import org.apache.livy.Logging
    +
    +/**
    + * The class is taken from Hive's `HadoopThriftAuthBridge.Server`. It bridges Thrift's SASL
    + * transports to Hadoop's SASL callback handlers and authentication classes.
    + *
    + * This class is based on Hive's one.
    + */
    +class AuthBridgeServer(private val secretManager: LivyDelegationTokenSecretManager) {
    +  private val ugi = try {
    +      UserGroupInformation.getCurrentUser
    +    } catch {
    +      case ioe: IOException => throw new TTransportException(ioe)
    +    }
    +
    +  /**
    +   * Create a TTransportFactory that, upon connection of a client socket,
    +   * negotiates a Kerberized SASL transport. The resulting TTransportFactory
    +   * can be passed as both the input and output transport factory when
    +   * instantiating a TThreadPoolServer, for example.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createTransportFactory(saslProps: util.Map[String, String]): TTransportFactory = {
    +    val transFactory: TSaslServerTransport.Factory = createSaslServerTransportFactory(saslProps)
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Create a TSaslServerTransport.Factory that, upon connection of a client
    +   * socket, negotiates a Kerberized SASL transport.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createSaslServerTransportFactory(
    +      saslProps: util.Map[String, String]): TSaslServerTransport.Factory = {
    +    // Parse out the kerberos principal, host, realm.
    +    val kerberosName: String = ugi.getUserName
    +    val names: Array[String] = SaslRpcServer.splitKerberosName(kerberosName)
    +    if (names.length != 3) {
    +      throw new TTransportException(s"Kerberos principal should have 3 parts: $kerberosName")
    +    }
    +    val transFactory: TSaslServerTransport.Factory = new TSaslServerTransport.Factory
    +    transFactory.addServerDefinition(AuthMethod.KERBEROS.getMechanismName,
    +      names(0), names(1), // two parts of kerberos principal
    +      saslProps,
    +      new SaslRpcServer.SaslGssCallbackHandler)
    +    transFactory.addServerDefinition(AuthMethod.TOKEN.getMechanismName,
    +      null,
    +      SaslRpcServer.SASL_DEFAULT_REALM,
    +      saslProps,
    +      new SaslDigestCallbackHandler(secretManager))
    +    transFactory
    +  }
    +
    +  /**
    +   * Wrap a TTransportFactory in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapTransportFactory(transFactory: TTransportFactory): TTransportFactory = {
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, true)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor to capture the client information like connecting userid, ip etc
    +   */
    +  def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, false)
    +  }
    +
    +  def getRemoteAddress: InetAddress = AuthBridgeServer.remoteAddress.get
    +
    +  def getRemoteUser: String = AuthBridgeServer.remoteUser.get
    +
    +  def getUserAuthMechanism: String = AuthBridgeServer.userAuthMechanism.get
    +
    +}
    +
    +/**
    + * A TransportFactory that wraps another one, but assumes a specified UGI
    + * before calling through.
    + *
    + * This is used on the server side to assume the server's Principal when accepting
    + * clients.
    + *
    + * This class is derived from Hive's one.
    + */
    +private[auth] class TUGIAssumingTransportFactory(
    +    val wrapped: TTransportFactory, val ugi: UserGroupInformation) extends TTransportFactory {
    +  assert(wrapped != null)
    +  assert(ugi != null)
    +
    +  override def getTransport(trans: TTransport): TTransport = {
    +    ugi.doAs(new PrivilegedAction[TTransport]() {
    +      override def run: TTransport = wrapped.getTransport(trans)
    +    })
    +  }
    +}
    +
    +/**
    + * CallbackHandler for SASL DIGEST-MD5 mechanism.
    + */
    +// This code is pretty much completely based on Hadoop's SaslRpcServer.SaslDigestCallbackHandler -
    +// the only reason we could not use that Hadoop class as-is was because it needs a
    +// Server.Connection.
    +sealed class SaslDigestCallbackHandler(
    +    val secretManager: LivyDelegationTokenSecretManager) extends CallbackHandler with Logging {
    +  @throws[InvalidToken]
    +  private def getPassword(tokenId: LivyDelegationTokenIdentifier): Array[Char] = {
    +    encodePassword(secretManager.retrievePassword(tokenId))
    +  }
    +
    +  private def encodePassword(password: Array[Byte]): Array[Char] = {
    +    new String(Base64.encodeBase64(password)).toCharArray
    +  }
    +
    +  @throws[InvalidToken]
    +  @throws[UnsupportedCallbackException]
    +  override def handle(callbacks: Array[Callback]): Unit = {
    +    var nc: NameCallback = null
    +    var pc: PasswordCallback = null
    +    callbacks.foreach {
    +      case ac: AuthorizeCallback =>
    +        val authid: String = ac.getAuthenticationID
    +        val authzid: String = ac.getAuthorizationID
    +        if (authid == authzid) ac.setAuthorized(true)
    +        else ac.setAuthorized(false)
    +        if (ac.isAuthorized) {
    +          if (logger.isDebugEnabled) {
    +            val username = SaslRpcServer.getIdentifier(authzid, secretManager).getUser.getUserName
    +            debug(s"SASL server DIGEST-MD5 callback: setting canonicalized client ID: $username")
    +          }
    +          ac.setAuthorizedID(authzid)
    +        }
    +      case c: NameCallback => nc = c
    +      case c: PasswordCallback => pc = c
    +      case _: RealmCallback => // Do nothing.
    +      case other =>
    +        throw new UnsupportedCallbackException(other, "Unrecognized SASL DIGEST-MD5 Callback")
    +    }
    +    if (pc != null) {
    +      val tokenIdentifier = SaslRpcServer.getIdentifier(nc.getDefaultName, secretManager)
    +      val password: Array[Char] = getPassword(tokenIdentifier)
    +      if (logger.isDebugEnabled) {
    +        debug("SASL server DIGEST-MD5 callback: setting password for client: " +
    +          tokenIdentifier.getUser)
    +      }
    +      pc.setPassword(password)
    +    }
    +  }
    +}
    +
    +/**
    + * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's
    + * UGI before calling through to the original processor.
    + *
    + * This is used on the server side to set the UGI for each specific call.
    --- End diff --
    
    Most of the operations here are needed as this is setting the `remoteUser` which is used in many places later. But the UGI impersonation may not be needed indeed.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221886399
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftHttpServlet.scala ---
    @@ -0,0 +1,500 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.cli
    +
    +import java.io.IOException
    +import java.security.{PrivilegedExceptionAction, SecureRandom}
    +import javax.servlet.ServletException
    +import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}
    +import javax.ws.rs.core.NewCookie
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.codec.binary.{Base64, StringUtils}
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.authentication.util.KerberosName
    +import org.apache.hive.service.CookieSigner
    +import org.apache.hive.service.auth.{HiveAuthConstants, HttpAuthenticationException, HttpAuthUtils}
    +import org.apache.hive.service.auth.HiveAuthConstants.AuthTypes
    +import org.apache.hive.service.cli.HiveSQLException
    +import org.apache.thrift.TProcessor
    +import org.apache.thrift.protocol.TProtocolFactory
    +import org.apache.thrift.server.TServlet
    +import org.ietf.jgss.{GSSContext, GSSCredential, GSSException, GSSManager, Oid}
    +
    +import org.apache.livy.{LivyConf, Logging}
    +import org.apache.livy.thriftserver.SessionInfo
    +import org.apache.livy.thriftserver.auth.{AuthenticationProvider, AuthFactory}
    +
    +class ThriftHttpServlet(
    +    processor: TProcessor,
    +    protocolFactory: TProtocolFactory,
    +    val authType: String,
    +    val serviceUGI: UserGroupInformation,
    +    val httpUGI: UserGroupInformation,
    +    val authFactory: AuthFactory,
    +    val livyConf: LivyConf) extends TServlet(processor, protocolFactory) with Logging {
    +
    +  private val isCookieAuthEnabled = livyConf.getBoolean(LivyConf.THRIFT_HTTP_COOKIE_AUTH_ENABLED)
    +
    +  // Class members for cookie based authentication.
    +  private val signer: CookieSigner = if (isCookieAuthEnabled) {
    +      // Generate the signer with secret.
    +      val secret = ThriftHttpServlet.RAN.nextLong.toString
    +      debug("Using the random number as the secret for cookie generation " + secret)
    +      new CookieSigner(secret.getBytes())
    +    } else {
    +      null
    +    }
    +
    +  private val cookieDomain = livyConf.get(LivyConf.THRIFT_HTTP_COOKIE_DOMAIN)
    +  private val cookiePath = livyConf.get(LivyConf.THRIFT_HTTP_COOKIE_PATH)
    +  private val cookieMaxAge =
    +    (livyConf.getTimeAsMs(LivyConf.THRIFT_HTTP_COOKIE_MAX_AGE) / 1000).toInt
    +  private val isCookieSecure = livyConf.getBoolean(LivyConf.THRIFT_USE_SSL)
    +  private val isHttpOnlyCookie = livyConf.getBoolean(LivyConf.THRIFT_HTTP_COOKIE_IS_HTTPONLY)
    +  private val xsrfFilterEnabled = livyConf.getBoolean(LivyConf.THRIFT_XSRF_FILTER_ENABLED)
    +
    +  @throws[IOException]
    +  @throws[ServletException]
    +  override protected def doPost(
    +      request: HttpServletRequest, response: HttpServletResponse): Unit = {
    +    var clientUserName: String = null
    +    var requireNewCookie: Boolean = false
    +
    +    try {
    +      if (xsrfFilterEnabled) {
    +        val continueProcessing = ThriftHttpServlet.doXsrfFilter(request, response)
    +        if (!continueProcessing) {
    +          warn("Request did not have valid XSRF header, rejecting.")
    +          return
    +        }
    +      }
    +      // If the cookie based authentication is already enabled, parse the
    +      // request and validate the request cookies.
    +      if (isCookieAuthEnabled) {
    +        clientUserName = validateCookie(request)
    +        requireNewCookie = clientUserName == null
    +        if (requireNewCookie) {
    +          info("Could not validate cookie sent, will try to generate a new cookie")
    +        }
    +      }
    +      // If the cookie based authentication is not enabled or the request does
    +      // not have a valid cookie, use the kerberos or password based authentication
    +      // depending on the server setup.
    +      if (clientUserName == null) {
    +        // For a kerberos setup
    +        if (ThriftHttpServlet.isKerberosAuthMode(authType)) {
    +          val delegationToken = request.getHeader(ThriftHttpServlet.HIVE_DELEGATION_TOKEN_HEADER)
    +          // Each http request must have an Authorization header
    +          if ((delegationToken != null) && (!delegationToken.isEmpty)) {
    +            clientUserName = doTokenAuth(request, response)
    +          } else {
    +            clientUserName = doKerberosAuth(request)
    +          }
    +        } else {
    +          // For password based authentication
    +          clientUserName = doPasswdAuth(request, authType)
    +        }
    +      }
    +      debug(s"Client username: $clientUserName")
    +
    +      // Set the thread local username to be used for doAs if true
    +      SessionInfo.setUserName(clientUserName)
    +
    +      // find proxy user if any from query param
    +      val doAsQueryParam = ThriftHttpServlet.getDoAsQueryParam(request.getQueryString)
    +      if (doAsQueryParam != null) {
    +        SessionInfo.setProxyUserName(doAsQueryParam)
    +      }
    +
    +      val clientIpAddress = request.getRemoteAddr
    +      debug("Client IP Address: " + clientIpAddress)
    +      // Set the thread local ip address
    +      SessionInfo.setIpAddress(clientIpAddress)
    +
    +      // get forwarded hosts address
    +      val forwardedFor = request.getHeader(ThriftHttpServlet.X_FORWARDED_FOR)
    +      if (forwardedFor != null) {
    +        debug(s"${ThriftHttpServlet.X_FORWARDED_FOR}:$forwardedFor")
    +        SessionInfo.setForwardedAddresses(forwardedFor.split(",").toList.asJava)
    +      } else {
    +        SessionInfo.setForwardedAddresses(List.empty.asJava)
    +      }
    +
    +      // Generate new cookie and add it to the response
    +      if (requireNewCookie && !authType.equalsIgnoreCase(AuthTypes.NOSASL.toString)) {
    +        val cookieToken = HttpAuthUtils.createCookieToken(clientUserName)
    +        val hs2Cookie = createCookie(signer.signCookie(cookieToken))
    +
    +        if (isHttpOnlyCookie) {
    +          response.setHeader("SET-COOKIE", ThriftHttpServlet.getHttpOnlyCookieHeader(hs2Cookie))
    +        } else {
    +          response.addCookie(hs2Cookie)
    +        }
    +        info("Cookie added for clientUserName " + clientUserName)
    +      }
    +      super.doPost(request, response);
    +    } catch {
    +      case e: HttpAuthenticationException =>
    +        error("Error: ", e)
    +        // Send a 401 to the client
    +        response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
    +        if(ThriftHttpServlet.isKerberosAuthMode(authType)) {
    +          response.addHeader(HttpAuthUtils.WWW_AUTHENTICATE, HttpAuthUtils.NEGOTIATE)
    +        }
    +        // scalastyle:off println
    +        response.getWriter.println("Authentication Error: " + e.getMessage)
    +        // scalastyle:on println
    +    } finally {
    +      // Clear the thread locals
    +      SessionInfo.clearUserName()
    +      SessionInfo.clearIpAddress()
    +      SessionInfo.clearProxyUserName()
    +      SessionInfo.clearForwardedAddresses()
    +    }
    +  }
    +
    +  /**
    +   * Retrieves the client name from cookieString. If the cookie does not correspond to a valid
    +   * client, the function returns null.
    +   * @param cookies HTTP Request cookies.
    +   * @return Client Username if cookieString has a HS2 Generated cookie that is currently valid.
    +   *         Else, returns null.
    +   */
    +  private def getClientNameFromCookie(cookies: Array[Cookie]): String = {
    +    // Following is the main loop which iterates through all the cookies send by the client.
    +    // The HS2 generated cookies are of the format hive.server2.auth=<value>
    +    // A cookie which is identified as a hiveserver2 generated cookie is validated by calling
    +    // signer.verifyAndExtract(). If the validation passes, send the username for which the cookie
    +    // is validated to the caller. If no client side cookie passes the validation, return null to
    +    // the caller.
    +    cookies.filter(_.equals(ThriftHttpServlet.AUTH_COOKIE)).foreach { cookie =>
    +      val value = signer.verifyAndExtract(cookie.getValue)
    +      if (value != null) {
    +        val userName = HttpAuthUtils.getUserNameFromCookieToken(value)
    +        if (userName == null) {
    +          warn("Invalid cookie token " + value)
    +        } else {
    +          // We have found a valid cookie in the client request.
    +          if (logger.isDebugEnabled()) {
    +            debug("Validated the cookie for user " + userName)
    +          }
    +          return userName
    +        }
    +      }
    +    }
    +    // No valid generated cookies found, return null
    +    null
    +  }
    +
    +  /**
    +   * Convert cookie array to human readable cookie string
    +   * @param cookies Cookie Array
    +   * @return String containing all the cookies separated by a newline character.
    +   * Each cookie is of the format [key]=[value]
    +   */
    +  private def toCookieStr(cookies: Array[Cookie]): String = {
    +    cookies.map(c => s"${c.getName} = ${c.getValue} ;\n").mkString
    +  }
    +
    +  /**
    +   * Validate the request cookie. This function iterates over the request cookie headers
    +   * and finds a cookie that represents a valid client/server session. If it finds one, it
    +   * returns the client name associated with the session. Else, it returns null.
    +   * @param request The HTTP Servlet Request send by the client
    +   * @return Client Username if the request has valid HS2 cookie, else returns null
    +   */
    +  private def validateCookie(request: HttpServletRequest): String = {
    +    // Find all the valid cookies associated with the request.
    +    val cookies = request.getCookies
    +
    +    if (cookies == null) {
    +      if (logger.isDebugEnabled()) {
    +        debug("No valid cookies associated with the request " + request)
    +      }
    +      null
    +    } else {
    +      if (logger.isDebugEnabled()) {
    +        debug("Received cookies: " + toCookieStr(cookies))
    +      }
    +      getClientNameFromCookie(cookies)
    +    }
    +  }
    +
    +  /**
    +   * Generate a server side cookie given the cookie value as the input.
    +   * @param str Input string token.
    +   * @return The generated cookie.
    +   */
    +  private def createCookie(str: String): Cookie = {
    +    if (logger.isDebugEnabled()) {
    +      debug(s"Cookie name = ${ThriftHttpServlet.AUTH_COOKIE} value = $str")
    +    }
    +    val cookie = new Cookie(ThriftHttpServlet.AUTH_COOKIE, str)
    +
    +    cookie.setMaxAge(cookieMaxAge)
    +    if (cookieDomain != null) {
    +      cookie.setDomain(cookieDomain)
    +    }
    +    if (cookiePath != null) {
    +      cookie.setPath(cookiePath)
    +    }
    +    cookie.setSecure(isCookieSecure)
    +    cookie
    +  }
    +
    +
    +  /**
    +   * Do the authentication (LDAP/PAM not yet supported)
    +   */
    +  private def doPasswdAuth(request: HttpServletRequest, authType: String): String = {
    +    val userName = getUsername(request, authType)
    +    // No-op when authType is NOSASL
    +    if (!authType.equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.toString)) {
    +      try {
    +        val provider = AuthenticationProvider.getAuthenticationProvider(authType, livyConf)
    +        provider.Authenticate(userName, getPassword(request, authType))
    +      } catch {
    +        case e: Exception => throw new HttpAuthenticationException(e)
    +      }
    +    }
    +    userName
    +  }
    +
    +  private def doTokenAuth(request: HttpServletRequest, response: HttpServletResponse): String = {
    +    val tokenStr = request.getHeader(ThriftHttpServlet.HIVE_DELEGATION_TOKEN_HEADER)
    +    try {
    +      authFactory.verifyDelegationToken(tokenStr)
    +    } catch {
    +      case e: HiveSQLException => throw new HttpAuthenticationException(e);
    +    }
    +  }
    +
    +  /**
    +   * Do the GSS-API kerberos authentication. We already have a logged in subject in the form of
    +   * serviceUGI, which GSS-API will extract information from.
    +   * In case of a SPNego request we use the httpUGI, for the authenticating service tickets.
    +   */
    +  private def doKerberosAuth(request: HttpServletRequest): String = {
    --- End diff --
    
    mmh...IIRC there was no such filter which was able to do exactly the same thing which is done here. We might have to adapt/extend one. But if we cannot just reuse it I am not sure it is worth switching to them.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221881729
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthenticationProvider.scala ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.lang.reflect.InvocationTargetException
    +import javax.security.sasl.AuthenticationException
    +
    +import org.apache.hive.service.auth.PasswdAuthenticationProvider
    +
    +import org.apache.livy.LivyConf
    +
    +object AuthenticationProvider {
    --- End diff --
    
    Yes, I agree. Anyway, I don't think there is much difference as of now, ie. both APIs support Kerberos or none (this supports custom too at the moment, it i true, but I am not even sure how widespread its adoption is in Hive).
    
    Anyway I think we can revisit this later. This is a very sensitive part and I think a PR focused only on this would be better.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237318814
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftHttpServlet.scala ---
    @@ -0,0 +1,500 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.cli
    +
    +import java.io.IOException
    +import java.security.{PrivilegedExceptionAction, SecureRandom}
    +import javax.servlet.ServletException
    +import javax.servlet.http.{Cookie, HttpServletRequest, HttpServletResponse}
    +import javax.ws.rs.core.NewCookie
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.codec.binary.{Base64, StringUtils}
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.authentication.util.KerberosName
    +import org.apache.hive.service.CookieSigner
    +import org.apache.hive.service.auth.{HiveAuthConstants, HttpAuthenticationException, HttpAuthUtils}
    +import org.apache.hive.service.auth.HiveAuthConstants.AuthTypes
    +import org.apache.hive.service.cli.HiveSQLException
    +import org.apache.thrift.TProcessor
    +import org.apache.thrift.protocol.TProtocolFactory
    +import org.apache.thrift.server.TServlet
    +import org.ietf.jgss.{GSSContext, GSSCredential, GSSException, GSSManager, Oid}
    +
    +import org.apache.livy.{LivyConf, Logging}
    +import org.apache.livy.thriftserver.SessionInfo
    +import org.apache.livy.thriftserver.auth.{AuthenticationProvider, AuthFactory}
    +
    +class ThriftHttpServlet(
    --- End diff --
    
    Comment about where this comes from? (Even if it's similar to the other classes above.)


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221764487
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthBridgeServer.scala ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.security.{PrivilegedAction, PrivilegedExceptionAction}
    +import java.util
    +import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
    +import javax.security.sasl.{AuthorizeCallback, RealmCallback, SaslServer}
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.hadoop.fs.FileSystem
    +import org.apache.hadoop.security.{SaslRpcServer, UserGroupInformation}
    +import org.apache.hadoop.security.SaslRpcServer.AuthMethod
    +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
    +import org.apache.hadoop.security.token.SecretManager.InvalidToken
    +import org.apache.thrift.{TException, TProcessor}
    +import org.apache.thrift.protocol.TProtocol
    +import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
    +
    +import org.apache.livy.Logging
    +
    +/**
    + * The class is taken from Hive's `HadoopThriftAuthBridge.Server`. It bridges Thrift's SASL
    + * transports to Hadoop's SASL callback handlers and authentication classes.
    + *
    + * This class is based on Hive's one.
    + */
    +class AuthBridgeServer(private val secretManager: LivyDelegationTokenSecretManager) {
    +  private val ugi = try {
    +      UserGroupInformation.getCurrentUser
    +    } catch {
    +      case ioe: IOException => throw new TTransportException(ioe)
    +    }
    +
    +  /**
    +   * Create a TTransportFactory that, upon connection of a client socket,
    +   * negotiates a Kerberized SASL transport. The resulting TTransportFactory
    +   * can be passed as both the input and output transport factory when
    +   * instantiating a TThreadPoolServer, for example.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createTransportFactory(saslProps: util.Map[String, String]): TTransportFactory = {
    +    val transFactory: TSaslServerTransport.Factory = createSaslServerTransportFactory(saslProps)
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Create a TSaslServerTransport.Factory that, upon connection of a client
    +   * socket, negotiates a Kerberized SASL transport.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createSaslServerTransportFactory(
    +      saslProps: util.Map[String, String]): TSaslServerTransport.Factory = {
    +    // Parse out the kerberos principal, host, realm.
    +    val kerberosName: String = ugi.getUserName
    +    val names: Array[String] = SaslRpcServer.splitKerberosName(kerberosName)
    +    if (names.length != 3) {
    +      throw new TTransportException(s"Kerberos principal should have 3 parts: $kerberosName")
    +    }
    +    val transFactory: TSaslServerTransport.Factory = new TSaslServerTransport.Factory
    +    transFactory.addServerDefinition(AuthMethod.KERBEROS.getMechanismName,
    +      names(0), names(1), // two parts of kerberos principal
    +      saslProps,
    +      new SaslRpcServer.SaslGssCallbackHandler)
    +    transFactory.addServerDefinition(AuthMethod.TOKEN.getMechanismName,
    +      null,
    +      SaslRpcServer.SASL_DEFAULT_REALM,
    +      saslProps,
    +      new SaslDigestCallbackHandler(secretManager))
    +    transFactory
    +  }
    +
    +  /**
    +   * Wrap a TTransportFactory in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapTransportFactory(transFactory: TTransportFactory): TTransportFactory = {
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, true)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor to capture the client information like connecting userid, ip etc
    +   */
    +  def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, false)
    +  }
    +
    +  def getRemoteAddress: InetAddress = AuthBridgeServer.remoteAddress.get
    +
    +  def getRemoteUser: String = AuthBridgeServer.remoteUser.get
    +
    +  def getUserAuthMechanism: String = AuthBridgeServer.userAuthMechanism.get
    +
    +}
    +
    +/**
    + * A TransportFactory that wraps another one, but assumes a specified UGI
    + * before calling through.
    + *
    + * This is used on the server side to assume the server's Principal when accepting
    + * clients.
    + *
    + * This class is derived from Hive's one.
    + */
    +private[auth] class TUGIAssumingTransportFactory(
    +    val wrapped: TTransportFactory, val ugi: UserGroupInformation) extends TTransportFactory {
    +  assert(wrapped != null)
    +  assert(ugi != null)
    +
    +  override def getTransport(trans: TTransport): TTransport = {
    +    ugi.doAs(new PrivilegedAction[TTransport]() {
    +      override def run: TTransport = wrapped.getTransport(trans)
    +    })
    +  }
    +}
    +
    +/**
    + * CallbackHandler for SASL DIGEST-MD5 mechanism.
    + */
    +// This code is pretty much completely based on Hadoop's SaslRpcServer.SaslDigestCallbackHandler -
    --- End diff --
    
    Put this inside the javadoc comment?


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237318294
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/PlainSaslServer.scala ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.livy.thriftserver.auth
    +
    +
    +import java.io.IOException
    +import java.security.{Provider, Security}
    +import java.util
    +import javax.security.auth.callback.Callback
    +import javax.security.auth.callback.CallbackHandler
    +import javax.security.auth.callback.NameCallback
    +import javax.security.auth.callback.PasswordCallback
    +import javax.security.auth.callback.UnsupportedCallbackException
    +import javax.security.auth.login.LoginException
    +import javax.security.sasl._
    +
    +import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods
    +import org.apache.thrift.transport.TSaslServerTransport
    +
    +import org.apache.livy.LivyConf
    +
    +
    +/**
    + * Sun JDK only provides a PLAIN client and no server. This class implements the Plain SASL server
    + * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt).
    + */
    +class PlainSaslServer private[auth] (
    --- End diff --
    
    I'm almost suggesting keeping a Livy-specific class in the `org.apache.hive.service.auth` package, extending the Hive one and just exposing a public constructor.
    
    (Similar for other classes you're copying for seemingly similar reasons.)


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221768562
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/PlainSaslServer.scala ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.livy.thriftserver.auth
    +
    +
    +import java.io.IOException
    +import java.security.{Provider, Security}
    +import java.util
    +import javax.security.auth.callback.Callback
    +import javax.security.auth.callback.CallbackHandler
    +import javax.security.auth.callback.NameCallback
    +import javax.security.auth.callback.PasswordCallback
    +import javax.security.auth.callback.UnsupportedCallbackException
    +import javax.security.auth.login.LoginException
    +import javax.security.sasl._
    +
    +import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods
    +import org.apache.thrift.transport.TSaslServerTransport
    +
    +import org.apache.livy.LivyConf
    +
    +
    +/**
    + * Sun JDK only provides a PLAIN client and no server. This class implements the Plain SASL server
    + * conforming to RFC #4616 (http://www.ietf.org/rfc/rfc4616.txt).
    + */
    +class PlainSaslServer private[auth] (
    +    val handler: CallbackHandler,
    +    val authMethodStr: String) extends SaslServer {
    +
    +  AuthMethods.getValidAuthMethod(authMethodStr)
    +
    +  private var user: String = null
    +  override def getMechanismName: String = PlainSaslServer.PLAIN_METHOD
    +
    +  @throws[SaslException]
    +  override def evaluateResponse(response: Array[Byte]): Array[Byte] = {
    +    try {
    +      // parse the response
    +      // message = [authzid] UTF8NUL authcid UTF8NUL passwd'
    +      val tokenList: util.Deque[String] = new util.ArrayDeque[String]
    +      var messageToken = new StringBuilder
    +      for (b <- response) {
    +        if (b == 0) {
    +          tokenList.addLast(messageToken.toString)
    +          messageToken = new StringBuilder
    +        } else {
    +          messageToken.append(b.toChar)
    +        }
    +      }
    +      tokenList.addLast(messageToken.toString)
    +      // validate response
    +      if (tokenList.size < 2 || tokenList.size > 3) {
    +        throw new SaslException ("Invalid message format")
    +      }
    +      val passwd: String = tokenList.removeLast()
    +      user = tokenList.removeLast()
    +      // optional authzid
    +      var authzId: String = null
    +      if (tokenList.isEmpty) {
    +        authzId = user
    +      } else {
    +        authzId = tokenList.removeLast()
    +      }
    +      if (user == null || user.isEmpty) {
    +        throw new SaslException("No user name provided")
    +      }
    +      if (passwd == null || passwd.isEmpty) {
    +        throw new SaslException("No password name provided")
    +      }
    +      val nameCallback = new NameCallback ("User")
    +      nameCallback.setName (user)
    +      val pcCallback = new PasswordCallback ("Password", false)
    +      pcCallback.setPassword (passwd.toCharArray)
    +      val acCallback = new AuthorizeCallback (user, authzId)
    +      val cbList = Array[Callback](nameCallback, pcCallback, acCallback)
    +      handler.handle(cbList)
    +      if (!acCallback.isAuthorized) {
    +        throw new SaslException ("Authentication failed")
    +      }
    +    } catch {
    +      case eL: IllegalStateException =>
    +        throw new SaslException ("Invalid message format", eL)
    --- End diff --
    
    nit: no space before `(` (also in a few other places here an in other classes)


---

[GitHub] incubator-livy issue #117: [WIP][LIVY-502] Remove dependency on hive-exec

Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:

    https://github.com/apache/incubator-livy/pull/117
  
    # [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/117?src=pr&el=h1) Report
    > Merging [#117](https://codecov.io/gh/apache/incubator-livy/pull/117?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-livy/commit/56c76bc2d4563593edce062a563603fe63e5a431?src=pr&el=desc) will **increase** coverage by `0.41%`.
    > The diff coverage is `100%`.
    
    [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-livy/pull/117/graphs/tree.svg?width=650&token=0MkVbiUFwE&height=150&src=pr)](https://codecov.io/gh/apache/incubator-livy/pull/117?src=pr&el=tree)
    
    ```diff
    @@             Coverage Diff             @@
    ##             master    #117      +/-   ##
    ===========================================
    + Coverage     70.98%   71.4%   +0.41%     
    - Complexity      923     924       +1     
    ===========================================
      Files           100     100              
      Lines          5498    5564      +66     
      Branches        827     827              
    ===========================================
    + Hits           3903    3973      +70     
    + Misses         1063    1060       -3     
    + Partials        532     531       -1
    ```
    
    
    | [Impacted Files](https://codecov.io/gh/apache/incubator-livy/pull/117?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
    |---|---|---|---|
    | [...rver/src/main/scala/org/apache/livy/LivyConf.scala](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9MaXZ5Q29uZi5zY2FsYQ==) | `95.91% <100%> (+2.07%)` | `21 <0> (ø)` | :arrow_down: |
    | [...in/java/org/apache/livy/rsc/rpc/RpcDispatcher.java](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9ycGMvUnBjRGlzcGF0Y2hlci5qYXZh) | `63.26% <0%> (-4.09%)` | `19% <0%> (-1%)` | |
    | [...main/scala/org/apache/livy/server/LivyServer.scala](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zZXJ2ZXIvTGl2eVNlcnZlci5zY2FsYQ==) | `34.83% <0%> (-0.57%)` | `9% <0%> (ø)` | |
    | [.../scala/org/apache/livy/sessions/SessionState.scala](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-Y29yZS9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvc2Vzc2lvbnMvU2Vzc2lvblN0YXRlLnNjYWxh) | `58.33% <0%> (ø)` | `2% <0%> (ø)` | :arrow_down: |
    | [...in/java/org/apache/livy/rsc/driver/JobWrapper.java](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9kcml2ZXIvSm9iV3JhcHBlci5qYXZh) | `80% <0%> (ø)` | `8% <0%> (+1%)` | :arrow_up: |
    | [...c/src/main/java/org/apache/livy/rsc/RSCClient.java](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9SU0NDbGllbnQuamF2YQ==) | `83.85% <0%> (+1.24%)` | `26% <0%> (ø)` | :arrow_down: |
    | [...ain/java/org/apache/livy/rsc/driver/RSCDriver.java](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9kcml2ZXIvUlNDRHJpdmVyLmphdmE=) | `79.23% <0%> (+1.27%)` | `42% <0%> (+1%)` | :arrow_up: |
    | [...scala/org/apache/livy/repl/SparkRInterpreter.scala](https://codecov.io/gh/apache/incubator-livy/pull/117/diff?src=pr&el=tree#diff-cmVwbC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvcmVwbC9TcGFya1JJbnRlcnByZXRlci5zY2FsYQ==) | `60.69% <0%> (+2.31%)` | `14% <0%> (ø)` | :arrow_down: |
    
    ------
    
    [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-livy/pull/117?src=pr&el=continue).
    > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
    > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
    > Powered by [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/117?src=pr&el=footer). Last update [56c76bc...545a5c3](https://codecov.io/gh/apache/incubator-livy/pull/117?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).



---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221762741
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala ---
    @@ -114,24 +98,56 @@ object LivyThriftServer extends Logging {
         thriftServer.stop()
         thriftServer = null
       }
    +
    +  def isHTTPTransportMode(livyConf: LivyConf): Boolean = {
    +    val transportMode = livyConf.get(LivyConf.THRIFT_TRANSPORT_MODE)
    +    transportMode != null && transportMode.equalsIgnoreCase("http")
    +  }
     }
     
     
     class LivyThriftServer(
         private[thriftserver] val livyConf: LivyConf,
         private[thriftserver] val livySessionManager: InteractiveSessionManager,
         private[thriftserver] val sessionStore: SessionStore,
    -    private[thriftserver] val accessManager: AccessManager) extends HiveServer2 {
    -  override def init(hiveConf: HiveConf): Unit = {
    -    this.cliService = new LivyCLIService(this)
    -    super.init(hiveConf)
    +    private[thriftserver] val accessManager: AccessManager)
    +  extends ThriftService(classOf[LivyThriftServer].getName) with Logging {
    +
    +  val cliService = new LivyCLIService(this)
    +
    +  override def init(livyConf: LivyConf): Unit = {
    +    addService(cliService)
    +    val server = this
    +    val oomHook = new Runnable() {
    +      override def run(): Unit = {
    +        server.stop()
    +      }
    +    }
    +    val thriftCLIService = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
    +      new ThriftHttpCLIService(cliService, oomHook)
    +    } else {
    +      new ThriftBinaryCLIService(cliService, oomHook)
    +    }
    +    addService(thriftCLIService)
    +    super.init(livyConf)
    +    Runtime.getRuntime.addShutdownHook(new Thread("Livy Server Shutdown") {
    +      override def run(): Unit = {
    +        info("Shutting down Livy server.")
    +        LivyThriftServer.this.stop()
    +      }
    +    })
       }
     
    -  private[thriftserver] def getSessionManager(): LivyThriftSessionManager = {
    -    this.cliService.asInstanceOf[LivyCLIService].getSessionManager
    +  private[thriftserver] def getSessionManager = {
    +    cliService.getSessionManager
       }
     
       def isAllowedToUse(user: String, session: InteractiveSession): Boolean = {
         session.owner == user || accessManager.checkModifyPermissions(user)
       }
    +
    +  override def stop(): Unit = {
    +    info("Shutting down HiveServer2")
    --- End diff --
    
    HiveServer2?


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221772907
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.operation
    +
    +import java.sql.{DatabaseMetaData, Types}
    +
    +import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
    +
    +import org.apache.livy.Logging
    +import org.apache.livy.thriftserver.serde.ResultSet
    +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
    +
    +sealed case class TypeInfo(name: String, sqlType: Int, precision: Option[Int],
    +  caseSensitive: Boolean, searchable: Short, unsignedAttribute: Boolean, numPrecRadix: Option[Int])
    +
    +/**
    + * GetTypeInfoOperation.
    + *
    + */
    +class GetTypeInfoOperation(sessionHandle: SessionHandle)
    --- End diff --
    
    Any specific reason why you have to fork this class, and others in this package?


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-livy/pull/117


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237319002
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTableTypesOperation.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.operation
    +
    +import org.apache.hive.service.cli._
    +
    +import org.apache.livy.Logging
    +import org.apache.livy.thriftserver.serde.ResultSet
    +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
    +
    +/**
    + * GetTableTypesOperation.
    --- End diff --
    
    Same.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221883802
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/cli/ThriftCLIService.scala ---
    @@ -0,0 +1,745 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.cli
    +
    +import java.io.IOException
    +import java.net.{InetAddress, UnknownHostException}
    +import java.util
    +import java.util.Collections
    +import javax.security.auth.login.LoginException
    +
    +import scala.collection.JavaConverters._
    +
    +import com.google.common.base.Preconditions.checkArgument
    +import org.apache.hadoop.security.UserGroupInformation
    +import org.apache.hadoop.security.authentication.util.KerberosName
    +import org.apache.hadoop.security.authorize.ProxyUsers
    +import org.apache.hadoop.util.StringUtils
    +import org.apache.hive.service.{ServiceException, ServiceUtils}
    +import org.apache.hive.service.auth.{HiveAuthConstants, TSetIpAddressProcessor}
    +import org.apache.hive.service.auth.HiveAuthConstants.AuthTypes
    +import org.apache.hive.service.cli._
    +import org.apache.hive.service.rpc.thrift._
    +import org.apache.thrift.TException
    +import org.apache.thrift.server.ServerContext
    +
    +import org.apache.livy.LivyConf
    +import org.apache.livy.thriftserver.{LivyCLIService, LivyThriftServer, SessionInfo, ThriftService}
    +import org.apache.livy.thriftserver.auth.AuthFactory
    +
    +/**
    + * This class is ported from Hive. We cannot reuse Hive's one because we need to use the
    + * `LivyCLIService`, `LivyConf` and `AuthFacotry` instead of Hive's one.
    + */
    +abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: String)
    +    extends ThriftService(serviceName) with TCLIService.Iface with Runnable {
    +
    +  def hiveAuthFactory: AuthFactory
    +
    +  protected val currentServerContext = new ThreadLocal[ServerContext]
    +  protected var portNum: Int = 0
    +  protected var serverIPAddress: InetAddress = _
    +  protected var hiveHost: String = _
    +  private var isStarted: Boolean = false
    +  protected var isEmbedded: Boolean = false
    +  protected var livyConf: LivyConf = _
    +  protected var minWorkerThreads: Int = 0
    +  protected var maxWorkerThreads: Int = 0
    +  protected var workerKeepAliveTime: Long = 0L
    +  private var serverThread: Thread = _
    +
    +  override def init(conf: LivyConf): Unit = {
    +    livyConf = conf
    +    hiveHost = livyConf.get(LivyConf.THRIFT_BIND_HOST)
    +    try {
    +      if (hiveHost == null || hiveHost.isEmpty) {
    +        serverIPAddress = InetAddress.getLocalHost
    +      } else {
    +        serverIPAddress = InetAddress.getByName(hiveHost)
    +      }
    +    } catch {
    +      case e: UnknownHostException =>
    +        throw new ServiceException(e)
    +    }
    +    portNum = livyConf.getInt(LivyConf.THRIFT_SERVER_PORT)
    +    workerKeepAliveTime = livyConf.getTimeAsMs(LivyConf.THRIFT_WORKER_KEEPALIVE_TIME) / 1000
    +    minWorkerThreads = livyConf.getInt(LivyConf.THRIFT_MIN_WORKER_THREADS)
    +    maxWorkerThreads = livyConf.getInt(LivyConf.THRIFT_MAX_WORKER_THREADS)
    +    super.init(livyConf)
    +  }
    +
    +  protected def initServer(): Unit
    +
    +  override def start(): Unit = {
    +    super.start()
    +    if (!isStarted && !isEmbedded) {
    +      initServer()
    +      serverThread = new Thread(this)
    +      serverThread.setName("Thrift Server")
    +      serverThread.start()
    +      isStarted = true
    +    }
    +  }
    +
    +  protected def stopServer(): Unit
    +
    +  override def stop(): Unit = {
    +    if (isStarted && !isEmbedded) {
    +      if (serverThread != null) {
    +        serverThread.interrupt()
    +        serverThread = null
    +      }
    +      stopServer()
    +      isStarted = false
    +    }
    +    super.stop()
    +  }
    +
    +  def getPortNumber: Int = portNum
    +
    +  def getServerIPAddress: InetAddress = serverIPAddress
    +
    +  @throws[TException]
    +  override def GetDelegationToken(req: TGetDelegationTokenReq): TGetDelegationTokenResp = {
    +    val resp: TGetDelegationTokenResp = new TGetDelegationTokenResp
    +    if (!hiveAuthFactory.isSASLKerberosUser) {
    +      resp.setStatus(unsecureTokenErrorStatus)
    +    } else {
    +      try {
    +        val token = cliService.getDelegationToken(
    +          new SessionHandle(req.getSessionHandle), hiveAuthFactory, req.getOwner, req.getRenewer)
    +        resp.setDelegationToken(token)
    +        resp.setStatus(ThriftCLIService.OK_STATUS)
    +      } catch {
    +        case e: HiveSQLException =>
    +          error("Error obtaining delegation token", e)
    +          val tokenErrorStatus = HiveSQLException.toTStatus(e)
    +          tokenErrorStatus.setSqlState("42000")
    +          resp.setStatus(tokenErrorStatus)
    +      }
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def CancelDelegationToken(req: TCancelDelegationTokenReq): TCancelDelegationTokenResp = {
    +    val resp: TCancelDelegationTokenResp = new TCancelDelegationTokenResp
    +    if (!hiveAuthFactory.isSASLKerberosUser) {
    +      resp.setStatus(unsecureTokenErrorStatus)
    +    } else {
    +      try {
    +        cliService.cancelDelegationToken(
    +          new SessionHandle(req.getSessionHandle), hiveAuthFactory, req.getDelegationToken)
    +        resp.setStatus(ThriftCLIService.OK_STATUS)
    +      } catch {
    +        case e: HiveSQLException =>
    +          error("Error canceling delegation token", e)
    +          resp.setStatus(HiveSQLException.toTStatus(e))
    +      }
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = {
    +    val resp: TRenewDelegationTokenResp = new TRenewDelegationTokenResp
    +    if (!hiveAuthFactory.isSASLKerberosUser) {
    +      resp.setStatus(unsecureTokenErrorStatus)
    +    } else {
    +      try {
    +        cliService.renewDelegationToken(
    +          new SessionHandle(req.getSessionHandle), hiveAuthFactory, req.getDelegationToken)
    +        resp.setStatus(ThriftCLIService.OK_STATUS)
    +      } catch {
    +        case e: HiveSQLException =>
    +          error("Error obtaining renewing token", e)
    +          resp.setStatus(HiveSQLException.toTStatus(e))
    +      }
    +    }
    +    resp
    +  }
    +
    +  private def unsecureTokenErrorStatus: TStatus = {
    +    val errorStatus: TStatus = new TStatus(TStatusCode.ERROR_STATUS)
    +    errorStatus.setErrorMessage(
    +      "Delegation token only supported over remote client with kerberos authentication")
    +    errorStatus
    +  }
    +
    +  @throws[TException]
    +  override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
    +    info("Client protocol version: " + req.getClient_protocol)
    +    val resp: TOpenSessionResp = new TOpenSessionResp
    +    try {
    +      val sessionHandle = getSessionHandle(req, resp)
    +      resp.setSessionHandle(sessionHandle.toTSessionHandle)
    +      val configurationMap: util.Map[String, String] = new util.HashMap[String, String]
    +      // Set the updated fetch size from the server into the configuration map for the client
    +      val defaultFetchSize =
    +        Integer.toString(livyConf.getInt(LivyConf.THRIFT_RESULTSET_DEFAULT_FETCH_SIZE))
    +      configurationMap.put(LivyConf.THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.key, defaultFetchSize)
    +      resp.setConfiguration(configurationMap)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +      Option(currentServerContext.get).foreach { context =>
    +        context.asInstanceOf[ThriftCLIServerContext].setSessionHandle(sessionHandle)
    +      }
    +    } catch {
    +      case e: Exception =>
    +        warn("Error opening session: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def SetClientInfo(req: TSetClientInfoReq): TSetClientInfoResp = {
    +    // TODO: We don't do anything for now, just log this for debugging.
    +    //       We may be able to make use of this later, e.g. for workload management.
    +    if (req.isSetConfiguration) {
    +      val sh = new SessionHandle(req.getSessionHandle)
    +      val sb = new StringBuilder("Client information for ").append(sh).append(": ")
    +
    +      def processEntry(e: util.Map.Entry[String, String]): Unit = {
    +        sb.append(e.getKey).append(" = ").append(e.getValue)
    +        if ("ApplicationName" == e.getKey) {
    +          cliService.setApplicationName(sh, e.getValue)
    +        }
    +      }
    +
    +      val entries = req.getConfiguration.entrySet.asScala.toSeq
    +      try {
    +        entries.headOption.foreach(processEntry)
    +        entries.tail.foreach { e =>
    +          sb.append(", ")
    +          processEntry(e)
    +        }
    +      } catch {
    +        case ex: Exception =>
    +          warn("Error setting application name", ex)
    +          return new TSetClientInfoResp(HiveSQLException.toTStatus(ex))
    +      }
    +      info(sb.toString())
    +    }
    +    new TSetClientInfoResp(ThriftCLIService.OK_STATUS)
    +  }
    +
    +  private def getIpAddress: String = {
    +    // Http transport mode.
    +    // We set the thread local ip address, in ThriftHttpServlet.
    +    val clientIpAddress = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
    +      SessionInfo.getIpAddress
    +    } else if (hiveAuthFactory.isSASLWithKerberizedHadoop) {
    +      hiveAuthFactory.getIpAddress
    +    } else {
    +      // NOSASL
    +      TSetIpAddressProcessor.getUserIpAddress
    +    }
    +    debug(s"Client's IP Address: $clientIpAddress")
    +    clientIpAddress
    +  }
    +
    +  /**
    +   * Returns the effective username.
    +   * 1. If livy.server.thrift.allow.user.substitution = false: the username of the connecting user
    +   * 2. If livy.server.thrift.allow.user.substitution = true: the username of the end user,
    +   * that the connecting user is trying to proxy for.
    +   * This includes a check whether the connecting user is allowed to proxy for the end user.
    +   */
    +  @throws[HiveSQLException]
    +  @throws[IOException]
    +  private def getUserName(req: TOpenSessionReq): String = {
    +    val username = if (LivyThriftServer.isHTTPTransportMode(livyConf)) {
    +      Option(SessionInfo.getUserName).getOrElse(req.getUsername)
    +    } else if (hiveAuthFactory.isSASLWithKerberizedHadoop) {
    +      Option(hiveAuthFactory.getRemoteUser).orElse(Option(TSetIpAddressProcessor.getUserName))
    +        .getOrElse(req.getUsername)
    +    } else {
    +      Option(TSetIpAddressProcessor.getUserName).getOrElse(req.getUsername)
    +    }
    +    val effectiveClientUser =
    +      getProxyUser(getShortName(username), req.getConfiguration, getIpAddress)
    +    debug(s"Client's username: $effectiveClientUser")
    +    effectiveClientUser
    +  }
    +
    +  @throws[IOException]
    +  private def getShortName(userName: String): String = {
    +    Option(userName).map { un =>
    +      if (hiveAuthFactory.isSASLKerberosUser) {
    +        // KerberosName.getShorName can only be used for kerberos user
    +        new KerberosName(un).getShortName
    +      } else {
    +        val indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(un)
    +        if (indexOfDomainMatch <= 0) {
    +          un
    +        } else {
    +          un.substring(0, indexOfDomainMatch)
    +        }
    +      }
    +    }.orNull
    +  }
    +
    +  /**
    +   * Create a session handle
    +   */
    +  @throws[HiveSQLException]
    +  @throws[LoginException]
    +  @throws[IOException]
    +  private[thriftserver] def getSessionHandle(
    +      req: TOpenSessionReq, res: TOpenSessionResp): SessionHandle = {
    +    val userName = getUserName(req)
    +    val ipAddress = getIpAddress
    +    val protocol = getMinVersion(LivyCLIService.SERVER_VERSION, req.getClient_protocol)
    +    val sessionHandle =
    +      if (livyConf.getBoolean(LivyConf.THRIFT_ENABLE_DOAS) && (userName != null)) {
    +        cliService.openSessionWithImpersonation(
    +          protocol, userName, req.getPassword, ipAddress, req.getConfiguration, null)
    +      } else {
    +        cliService.openSession(protocol, userName, req.getPassword, ipAddress, req.getConfiguration)
    +      }
    +    res.setServerProtocolVersion(protocol)
    +    sessionHandle
    +  }
    +
    +  @throws[HiveSQLException]
    +  private def getProgressedPercentage(opHandle: OperationHandle): Double = {
    +    checkArgument(OperationType.EXECUTE_STATEMENT == opHandle.getOperationType)
    +    0.0
    +  }
    +
    +  private def getMinVersion(versions: TProtocolVersion*): TProtocolVersion = {
    +    val values = TProtocolVersion.values
    +    var current = values(values.length - 1).getValue
    +    versions.foreach { version =>
    +      if (current > version.getValue) {
    +        current = version.getValue
    +      }
    +    }
    +    val res = values.find(_.getValue == current)
    +    assert(res.isDefined)
    +    res.get
    +  }
    +
    +  @throws[TException]
    +  override def CloseSession(req: TCloseSessionReq): TCloseSessionResp = {
    +    val resp = new TCloseSessionResp
    +    try {
    +      val sessionHandle = new SessionHandle(req.getSessionHandle)
    +      cliService.closeSession(sessionHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +      Option(currentServerContext.get).foreach { ctx =>
    +        ctx.asInstanceOf[ThriftCLIServerContext].setSessionHandle(null)
    +      }
    +    } catch {
    +      case e: Exception =>
    +        warn("Error closing session: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetInfo(req: TGetInfoReq): TGetInfoResp = {
    +    val resp = new TGetInfoResp
    +    try {
    +      val getInfoValue = cliService.getInfo(
    +        new SessionHandle(req.getSessionHandle), GetInfoType.getGetInfoType(req.getInfoType))
    +      resp.setInfoValue(getInfoValue.toTGetInfoValue)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting info: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def ExecuteStatement(req: TExecuteStatementReq): TExecuteStatementResp = {
    +    val resp = new TExecuteStatementResp
    +    try {
    +      val sessionHandle = new SessionHandle(req.getSessionHandle)
    +      val statement = req.getStatement
    +      val confOverlay = req.getConfOverlay
    +      val runAsync = req.isRunAsync
    +      val queryTimeout = req.getQueryTimeout
    +      val operationHandle = if (runAsync) {
    +          cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout)
    +        } else {
    +          cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout)
    +        }
    +      resp.setOperationHandle(operationHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error executing statement: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetTypeInfo(req: TGetTypeInfoReq): TGetTypeInfoResp = {
    +    val resp = new TGetTypeInfoResp
    +    try {
    +      val operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle))
    +      resp.setOperationHandle(operationHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting type info: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetCatalogs(req: TGetCatalogsReq): TGetCatalogsResp = {
    +    val resp = new TGetCatalogsResp
    +    try {
    +      val opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle))
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting catalogs: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = {
    +    val resp = new TGetSchemasResp
    +    try {
    +      val opHandle = cliService.getSchemas(
    +        new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting schemas: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetTables(req: TGetTablesReq): TGetTablesResp = {
    +    val resp = new TGetTablesResp
    +    try {
    +      val opHandle = cliService.getTables(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getTableName,
    +        req.getTableTypes)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting tables: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetTableTypes(req: TGetTableTypesReq): TGetTableTypesResp = {
    +    val resp = new TGetTableTypesResp
    +    try {
    +      val opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle))
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting table types: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetColumns(req: TGetColumnsReq): TGetColumnsResp = {
    +    val resp = new TGetColumnsResp
    +    try {
    +      val opHandle = cliService.getColumns(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getTableName,
    +        req.getColumnName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting columns: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetFunctions(req: TGetFunctionsReq): TGetFunctionsResp = {
    +    val resp = new TGetFunctionsResp
    +    try {
    +      val opHandle = cliService.getFunctions(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getFunctionName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting functions: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetOperationStatus(req: TGetOperationStatusReq): TGetOperationStatusResp = {
    +    val resp = new TGetOperationStatusResp
    +    val operationHandle = new OperationHandle(req.getOperationHandle)
    +    try {
    +      val operationStatus = cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate)
    +      resp.setOperationState(operationStatus.state.toTOperationState)
    +      resp.setErrorMessage(operationStatus.state.getErrorMessage)
    +      val opException = operationStatus.operationException
    +      resp.setOperationStarted(operationStatus.operationStarted)
    +      resp.setOperationCompleted(operationStatus.operationCompleted)
    +      resp.setHasResultSet(operationStatus.hasResultSet)
    +      val executionStatus = TJobExecutionStatus.NOT_AVAILABLE
    +      resp.setProgressUpdateResponse(new TProgressUpdateResp(
    +        Collections.emptyList[String],
    +        Collections.emptyList[util.List[String]],
    +        0.0D,
    +        executionStatus,
    +        "",
    +        0L))
    +      if (opException != null) {
    +        resp.setSqlState(opException.getSQLState)
    +        resp.setErrorCode(opException.getErrorCode)
    +        if (opException.getErrorCode == 29999) {
    +          resp.setErrorMessage(StringUtils.stringifyException(opException))
    +        } else {
    +          resp.setErrorMessage(opException.getMessage)
    +        }
    +      } else if (OperationType.EXECUTE_STATEMENT == operationHandle.getOperationType) {
    +        resp.getProgressUpdateResponse.setProgressedPercentage(
    +          getProgressedPercentage(operationHandle))
    +      }
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting operation status: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def CancelOperation(req: TCancelOperationReq): TCancelOperationResp = {
    +    val resp = new TCancelOperationResp
    +    try {
    +      cliService.cancelOperation(new OperationHandle(req.getOperationHandle))
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error cancelling operation: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def CloseOperation(req: TCloseOperationReq): TCloseOperationResp = {
    +    val resp = new TCloseOperationResp
    +    try {
    +      cliService.closeOperation(new OperationHandle(req.getOperationHandle))
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error closing operation: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetResultSetMetadata(req: TGetResultSetMetadataReq): TGetResultSetMetadataResp = {
    +    val resp = new TGetResultSetMetadataResp
    +    try {
    +      val schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle))
    +      resp.setSchema(schema.toTTableSchema)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting result set metadata: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
    +    val resp = new TFetchResultsResp
    +    try {
    +      // Set fetch size
    +      val maxFetchSize = livyConf.getInt(LivyConf.THRIFT_RESULTSET_MAX_FETCH_SIZE)
    +      if (req.getMaxRows > maxFetchSize) {
    +        req.setMaxRows(maxFetchSize)
    +      }
    +      val rowSet = cliService.fetchResults(
    +        new OperationHandle(req.getOperationHandle),
    +        FetchOrientation.getFetchOrientation(req.getOrientation),
    +        req.getMaxRows,
    +        FetchType.getFetchType(req.getFetchType))
    +      resp.setResults(rowSet.toTRowSet)
    +      resp.setHasMoreRows(false)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error fetching results: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetPrimaryKeys(req: TGetPrimaryKeysReq): TGetPrimaryKeysResp = {
    +    val resp = new TGetPrimaryKeysResp
    +    try {
    +      val opHandle = cliService.getPrimaryKeys(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getCatalogName,
    +        req.getSchemaName,
    +        req.getTableName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting functions: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetCrossReference(req: TGetCrossReferenceReq): TGetCrossReferenceResp = {
    +    val resp = new TGetCrossReferenceResp
    +    try {
    +      val opHandle = cliService.getCrossReference(
    +        new SessionHandle(req.getSessionHandle),
    +        req.getParentCatalogName,
    +        req.getParentSchemaName,
    +        req.getParentTableName,
    +        req.getForeignCatalogName,
    +        req.getForeignSchemaName,
    +        req.getForeignTableName)
    +      resp.setOperationHandle(opHandle.toTOperationHandle)
    +      resp.setStatus(ThriftCLIService.OK_STATUS)
    +    } catch {
    +      case e: Exception =>
    +        warn("Error getting functions: ", e)
    +        resp.setStatus(HiveSQLException.toTStatus(e))
    +    }
    +    resp
    +  }
    +
    +  @throws[TException]
    +  override def GetQueryId(req: TGetQueryIdReq): TGetQueryIdResp = {
    +    try {
    +      new TGetQueryIdResp(cliService.getQueryId(req.getOperationHandle))
    +    } catch {
    +      case e: HiveSQLException => throw new TException(e)
    +    }
    +  }
    +
    +  override def run(): Unit
    +
    +  /**
    +   * If the proxy user name is provided then check privileges to substitute the user.
    +   */
    +  @throws[HiveSQLException]
    +  private def getProxyUser(
    --- End diff --
    
    I agree. As previously, I'd do this in a separate PR in order to focus carefully on how to change these parts since they are sensitive.


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221764252
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthBridgeServer.scala ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.io.IOException
    +import java.net.InetAddress
    +import java.security.{PrivilegedAction, PrivilegedExceptionAction}
    +import java.util
    +import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
    +import javax.security.sasl.{AuthorizeCallback, RealmCallback, SaslServer}
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.hadoop.fs.FileSystem
    +import org.apache.hadoop.security.{SaslRpcServer, UserGroupInformation}
    +import org.apache.hadoop.security.SaslRpcServer.AuthMethod
    +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
    +import org.apache.hadoop.security.token.SecretManager.InvalidToken
    +import org.apache.thrift.{TException, TProcessor}
    +import org.apache.thrift.protocol.TProtocol
    +import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
    +
    +import org.apache.livy.Logging
    +
    +/**
    + * The class is taken from Hive's `HadoopThriftAuthBridge.Server`. It bridges Thrift's SASL
    + * transports to Hadoop's SASL callback handlers and authentication classes.
    + *
    + * This class is based on Hive's one.
    + */
    +class AuthBridgeServer(private val secretManager: LivyDelegationTokenSecretManager) {
    +  private val ugi = try {
    +      UserGroupInformation.getCurrentUser
    +    } catch {
    +      case ioe: IOException => throw new TTransportException(ioe)
    +    }
    +
    +  /**
    +   * Create a TTransportFactory that, upon connection of a client socket,
    +   * negotiates a Kerberized SASL transport. The resulting TTransportFactory
    +   * can be passed as both the input and output transport factory when
    +   * instantiating a TThreadPoolServer, for example.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createTransportFactory(saslProps: util.Map[String, String]): TTransportFactory = {
    +    val transFactory: TSaslServerTransport.Factory = createSaslServerTransportFactory(saslProps)
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Create a TSaslServerTransport.Factory that, upon connection of a client
    +   * socket, negotiates a Kerberized SASL transport.
    +   *
    +   * @param saslProps Map of SASL properties
    +   */
    +  @throws[TTransportException]
    +  def createSaslServerTransportFactory(
    +      saslProps: util.Map[String, String]): TSaslServerTransport.Factory = {
    +    // Parse out the kerberos principal, host, realm.
    +    val kerberosName: String = ugi.getUserName
    +    val names: Array[String] = SaslRpcServer.splitKerberosName(kerberosName)
    +    if (names.length != 3) {
    +      throw new TTransportException(s"Kerberos principal should have 3 parts: $kerberosName")
    +    }
    +    val transFactory: TSaslServerTransport.Factory = new TSaslServerTransport.Factory
    +    transFactory.addServerDefinition(AuthMethod.KERBEROS.getMechanismName,
    +      names(0), names(1), // two parts of kerberos principal
    +      saslProps,
    +      new SaslRpcServer.SaslGssCallbackHandler)
    +    transFactory.addServerDefinition(AuthMethod.TOKEN.getMechanismName,
    +      null,
    +      SaslRpcServer.SASL_DEFAULT_REALM,
    +      saslProps,
    +      new SaslDigestCallbackHandler(secretManager))
    +    transFactory
    +  }
    +
    +  /**
    +   * Wrap a TTransportFactory in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapTransportFactory(transFactory: TTransportFactory): TTransportFactory = {
    +    new TUGIAssumingTransportFactory(transFactory, ugi)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor in such a way that, before processing any RPC, it
    +   * assumes the UserGroupInformation of the user authenticated by
    +   * the SASL transport.
    +   */
    +  def wrapProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, true)
    +  }
    +
    +  /**
    +   * Wrap a TProcessor to capture the client information like connecting userid, ip etc
    +   */
    +  def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
    +    new TUGIAssumingProcessor(processor, secretManager, false)
    +  }
    +
    +  def getRemoteAddress: InetAddress = AuthBridgeServer.remoteAddress.get
    +
    +  def getRemoteUser: String = AuthBridgeServer.remoteUser.get
    +
    +  def getUserAuthMechanism: String = AuthBridgeServer.userAuthMechanism.get
    +
    +}
    +
    +/**
    + * A TransportFactory that wraps another one, but assumes a specified UGI
    + * before calling through.
    + *
    + * This is used on the server side to assume the server's Principal when accepting
    + * clients.
    + *
    + * This class is derived from Hive's one.
    + */
    +private[auth] class TUGIAssumingTransportFactory(
    +    val wrapped: TTransportFactory, val ugi: UserGroupInformation) extends TTransportFactory {
    --- End diff --
    
    One arg per line.


---

[GitHub] incubator-livy issue #117: [LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/incubator-livy/pull/117
  
    @jerryshao @vanzin anymore comments about this?


---

[GitHub] incubator-livy issue #117: [WIP][LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/incubator-livy/pull/117
  
    thanks for the review @vanzin. I have updated the PR description. Let me know if it still needs to be improved.
    
    >  I think as a first step getting the Hive code in, with the minimal amount of changes necessary, is better.
    
    I am not sure, as the only difference from that was moving from Java to Scala, which saves many conversion back and forth of collections and hence adds unneeded code only to handle this.
    
    > overall take another look at whether the things you're forking really need to be forked
    
    I am pretty sure that nothing which is there is unneeded.



---

[GitHub] incubator-livy issue #117: [WIP][LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/incubator-livy/pull/117
  
    cc @jerryshao  @vanzin 


---

[GitHub] incubator-livy pull request #117: [WIP][LIVY-502] Remove dependency on hive-...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r221767779
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/auth/AuthenticationProvider.scala ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.auth
    +
    +import java.lang.reflect.InvocationTargetException
    +import javax.security.sasl.AuthenticationException
    +
    +import org.apache.hive.service.auth.PasswdAuthenticationProvider
    +
    +import org.apache.livy.LivyConf
    +
    +object AuthenticationProvider {
    --- End diff --
    
    I understand this is trying to keep the Hive functionality at the moment. But in the long run it would be better if this were basically the same backend used by the Livy server for authentication.
    
    e.g. if the main server uses basic auth with some user database, this code would do the same; similar for ldap, kerberos, etc.


---

[GitHub] incubator-livy issue #117: [LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/incubator-livy/pull/117
  
    Thanks for the review @vanzin! I addressed your comments.
    
    > I still think there's stuff to clean up here
    
    Not sure what you mean here exactly, but I think that after this patch we may start working with the Hive community in order to make some things easier to be re-used. Eg. we cannot reuse all the auth classes because they depend on the `HiveConf` we may propose to the Hive community to have them accepting in the constructor the parameters they need instead of the `HiveConf`, so we can reuse them. This may help reducing significantly the code size here.


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by mgaido91 <gi...@git.apache.org>.
GitHub user mgaido91 reopened a pull request:

    https://github.com/apache/incubator-livy/pull/117

    [LIVY-502] Remove dependency on hive-exec

    ## What changes were proposed in this pull request?
    
    This PR removes the dependency on `hive-exec`. Only modules of Hive which are used after this PR are `hive-service-rpc` and `hive-service`. This drastically reduces the amount of JARs needed by the thriftserver module.
    
    The PR takes the Hive classes which we were using and adapts them when necessary (or simpify when we don't need something) in order to work in the Livy thriftserver.
    
    Most of the classes are just migrated replacing all occurrences of `HiveConf`, `HiveSession` and other Hive specific classes. Only one class has a quite different logic than Hive's and it is `AuthFactory`, as we are using a different UGI handling from Hive (as we are not running the TS in a standalone JVM).
    
    The functionalities we are taking from Hive are:
     - the thrift protocol endpoints (the classes in the `cli` package), ie. the classes handling lower level details about the communication with the client;
     - the authentication layer (classes in the `auth` package, which are used in the ones in `cli`), despite this PR leaves LDAP and PAM as out of scope as they are not trivial to be ported and not needed for a working solution. We can add them later;
      - the classes in the `operation` package which are used to answer to metadata queries by the JDBC driver.
    
    ## How was this patch tested?
    
    existing UTs + manual tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mgaido91/incubator-livy LIVY-502

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-livy/pull/117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #117
    
----
commit e7aa5b1c98e322fb60963bcca18965c5d874ce10
Author: Marco Gaido <mg...@...>
Date:   2018-09-28T12:07:38Z

    [WIP][LIVY-502] Remove dependency on hive-exec

commit 545a5c3017e6daca022a61e8c51dbaefc98f8433
Author: Marco Gaido <mg...@...>
Date:   2018-09-28T15:55:50Z

    fix Http thriftserver

commit 75aa6421bbf70eed4dfd311deebed703a8fe0c5a
Author: Marco Gaido <mg...@...>
Date:   2018-10-02T10:18:20Z

    address comments

commit c5006a9aa35e717a172b67943269975ab830a3b9
Author: Marco Gaido <mg...@...>
Date:   2018-11-29T10:13:04Z

    address comments

commit 0110fded4a988457eec7b48be991191026265da2
Author: Marco Gaido <mg...@...>
Date:   2018-11-29T10:22:25Z

    Merge branch 'master' of github.com:apache/incubator-livy into LIVY-502

commit f3040f682aa360aade2d4dfb2cc8bfc635c4a749
Author: Marco Gaido <mg...@...>
Date:   2018-11-29T10:53:00Z

    fix merge

----


---

[GitHub] incubator-livy pull request #117: [LIVY-502] Remove dependency on hive-exec

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/incubator-livy/pull/117#discussion_r237319439
  
    --- Diff: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetTypeInfoOperation.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.livy.thriftserver.operation
    +
    +import java.sql.{DatabaseMetaData, Types}
    +
    +import org.apache.hive.service.cli.{HiveSQLException, OperationState, OperationType, SessionHandle}
    +
    +import org.apache.livy.Logging
    +import org.apache.livy.thriftserver.serde.ResultSet
    +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema}
    +
    +sealed case class TypeInfo(name: String, sqlType: Int, precision: Option[Int],
    +  caseSensitive: Boolean, searchable: Short, unsignedAttribute: Boolean, numPrecRadix: Option[Int])
    +
    +/**
    + * GetTypeInfoOperation.
    + *
    + */
    +class GetTypeInfoOperation(sessionHandle: SessionHandle)
    --- End diff --
    
    I missed the `HiveConf` dependency when looking at the Hive code. Well, sigh.


---