You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/03/17 14:04:29 UTC

svn commit: r1578350 [2/2] - in /hive/branches/branch-0.13: ./ itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/ jdbc/src/java/org/apache/hive/jdbc/ ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/ service/sr...

Modified: hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java (original)
+++ hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java Mon Mar 17 13:04:29 2014
@@ -19,6 +19,7 @@
 package org.apache.hive.service.cli.thrift;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -28,61 +29,252 @@ import org.apache.commons.codec.binary.B
 import org.apache.commons.codec.binary.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.auth.AuthenticationProviderFactory;
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.HttpAuthUtils;
+import org.apache.hive.service.auth.HttpAuthenticationException;
+import org.apache.hive.service.auth.PasswdAuthenticationProvider;
+import org.apache.hive.service.cli.session.SessionManager;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServlet;
-
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+/**
+ *
+ * ThriftHttpServlet
+ *
+ */
 public class ThriftHttpServlet extends TServlet {
 
   private static final long serialVersionUID = 1L;
   public static final Log LOG = LogFactory.getLog(ThriftHttpServlet.class.getName());
+  private final String authType;
+  private final UserGroupInformation serviceUGI;
 
-  public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory) {
+  public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
+      String authType, UserGroupInformation serviceUGI) {
     super(processor, protocolFactory);
+    this.authType = authType;
+    this.serviceUGI = serviceUGI;
   }
 
   @Override
   protected void doPost(HttpServletRequest request, HttpServletResponse response)
       throws ServletException, IOException {
-    logRequestHeader(request);
-    super.doPost(request, response);
-  }
+    String clientUserName;
+    try {
+      // For a kerberos setup
+      if(isKerberosAuthMode(authType)) {
+        clientUserName = doKerberosAuth(request, serviceUGI);
+      }
+      else {
+        clientUserName = doPasswdAuth(request, authType);
+      }
 
-  protected void logRequestHeader(HttpServletRequest request) {
-    String authHeaderBase64 = request.getHeader("Authorization");
-    if(authHeaderBase64 == null) {
-      LOG.warn("ThriftHttpServlet: no HTTP Authorization header");
+      LOG.info("Client username: " + clientUserName);
+      
+      // Set the thread local username to be used for doAs if true
+      SessionManager.setUserName(clientUserName);
+      super.doPost(request, response);
     }
-    else {
-      if(!authHeaderBase64.startsWith("Basic")) {
-        LOG.warn("ThriftHttpServlet: HTTP Authorization header exists but is not Basic.");
+    catch (HttpAuthenticationException e) {
+      // Send a 403 to the client
+      LOG.error("Error: ", e);
+      response.setContentType("application/x-thrift");
+      response.setStatus(HttpServletResponse.SC_FORBIDDEN);
+      // Send the response back to the client
+      response.getWriter().println("Authentication Error: " + e.getMessage());
+    }
+    finally {
+      // Clear the thread local username since we set it in each http request
+      SessionManager.clearUserName();
+    }
+  }
+
+  /**
+   * Do the LDAP/PAM authentication
+   * @param request
+   * @param authType
+   * @throws HttpAuthenticationException
+   */
+  private String doPasswdAuth(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String userName = getUsername(request, authType);
+    // No-op when authType is NOSASL
+    if (!authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.NOSASL.toString())) {
+      try {
+        AuthMethods authMethod = AuthMethods.getValidAuthMethod(authType);
+        PasswdAuthenticationProvider provider =
+            AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
+        provider.Authenticate(userName, getPassword(request, authType));
+
+      } catch (Exception e) {
+        throw new HttpAuthenticationException(e);
       }
-      else if(LOG.isDebugEnabled()) {
-        String authHeaderBase64_Payload = authHeaderBase64.substring("Basic ".length());
-        String authHeaderString = StringUtils.newStringUtf8(
-            Base64.decodeBase64(authHeaderBase64_Payload.getBytes()));
-        String[] creds = authHeaderString.split(":");
-        String username = null;
-        String password = null;
+    }
+    return userName;
+  }
 
-        if(creds.length >= 1) {
-          username = creds[0];
-        }
-        if(creds.length >= 2) {
-          password = creds[1];
-        }
-        if(password == null || password.equals("null") || password.equals("")) {
-          password = "<no password>";
+  /**
+   * Do the GSS-API kerberos authentication.
+   * We already have a logged in subject in the form of serviceUGI,
+   * which GSS-API will extract information from.
+   * @param request
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String doKerberosAuth(HttpServletRequest request, 
+      UserGroupInformation serviceUGI) throws HttpAuthenticationException {
+    try {
+      return serviceUGI.doAs(new HttpKerberosServerAction(request, serviceUGI));
+    } catch (Exception e) {
+      throw new HttpAuthenticationException(e);
+    }
+  }
+
+  class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
+    HttpServletRequest request;
+    UserGroupInformation serviceUGI;
+    
+    HttpKerberosServerAction(HttpServletRequest request, 
+        UserGroupInformation serviceUGI) {
+      this.request = request;
+      this.serviceUGI = serviceUGI;
+    }
+
+    @Override
+    public String run() throws HttpAuthenticationException {
+   // Get own Kerberos credentials for accepting connection
+      GSSManager manager = GSSManager.getInstance();
+      GSSContext gssContext = null;
+      String serverPrincipal = getPrincipalWithoutRealm(
+          serviceUGI.getUserName());
+      try {
+        // This Oid for Kerberos GSS-API mechanism.
+        Oid mechOid = new Oid("1.2.840.113554.1.2.2");
+        // Oid for kerberos principal name
+        Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
+
+        // GSS name for server
+        GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
+
+        // GSS credentials for server
+        GSSCredential serverCreds = manager.createCredential(serverName,
+            GSSCredential.DEFAULT_LIFETIME,  mechOid, GSSCredential.ACCEPT_ONLY);
+
+        // Create a GSS context
+        gssContext = manager.createContext(serverCreds);
+
+        // Get service ticket from the authorization header
+        String serviceTicketBase64 = getAuthHeader(request, authType);
+        byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
+
+        gssContext.acceptSecContext(inToken, 0, inToken.length);
+        // Authenticate or deny based on its context completion
+        if (!gssContext.isEstablished()) {
+          throw new HttpAuthenticationException("Kerberos authentication failed: " +
+              "unable to establish context with the service ticket " +
+              "provided by the client.");
         }
         else {
-          // don't log the actual password.
-          password = "******";
+          return getPrincipalWithoutRealm(gssContext.getSrcName().toString());
+        }
+      }
+      catch (GSSException e) {
+        throw new HttpAuthenticationException("Kerberos authentication failed: ", e);
+      }
+      finally {
+        if (gssContext != null) {
+          try {
+            gssContext.dispose();
+          } catch (GSSException e) {
+            // No-op
+          }
         }
-        LOG.debug("HttpServlet:  HTTP Authorization header:: username=" + username +
-            " password=" + password);
       }
     }
+
+    private String getPrincipalWithoutRealm(String fullPrincipal) {
+      String names[] = fullPrincipal.split("[@]");
+      return names[0];
+    }
+  }
+
+  private String getUsername(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Username must be present
+    if (creds[0] == null || creds[0].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[0];
+  }
+
+  private String getPassword(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String creds[] = getAuthHeaderTokens(request, authType);
+    // Password must be present
+    if (creds[1] == null || creds[1].isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain username.");
+    }
+    return creds[1];
+  }
+
+  private String[] getAuthHeaderTokens(HttpServletRequest request,
+      String authType) throws HttpAuthenticationException {
+    String authHeaderBase64 = getAuthHeader(request, authType);
+    String authHeaderString = StringUtils.newStringUtf8(
+        Base64.decodeBase64(authHeaderBase64.getBytes()));
+    String[] creds = authHeaderString.split(":");
+    return creds;
   }
 
+  /**
+   * Returns the base64 encoded auth header payload
+   * @param request
+   * @param authType
+   * @return
+   * @throws HttpAuthenticationException
+   */
+  private String getAuthHeader(HttpServletRequest request, String authType)
+      throws HttpAuthenticationException {
+    String authHeader = request.getHeader(HttpAuthUtils.AUTHORIZATION);
+    // Each http request must have an Authorization header
+    if (authHeader == null || authHeader.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client is empty.");
+    }
+
+    String authHeaderBase64String;
+    int beginIndex;
+    if (isKerberosAuthMode(authType)) {
+      beginIndex = (HttpAuthUtils.NEGOTIATE + " ").length();
+    }
+    else {
+      beginIndex = (HttpAuthUtils.BASIC + " ").length();
+    }
+    authHeaderBase64String = authHeader.substring(beginIndex);
+    // Authorization header must have a payload
+    if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
+      throw new HttpAuthenticationException("Authorization header received " +
+          "from the client does not contain any data.");
+    }
+    return authHeaderBase64String;
+  }
+
+  private boolean isKerberosAuthMode(String authType) {
+    return authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
+  }
 }
 
+

Modified: hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Mon Mar 17 13:04:29 2014
@@ -66,99 +66,121 @@ import org.apache.thrift.transport.TTran
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
- /**
-  * Functions that bridge Thrift's SASL transports to Hadoop's
-  * SASL callback handlers and authentication classes.
-  */
- public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge {
-   static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
-
-   @Override
-   public Client createClient() {
-     return new Client();
-   }
-
-   @Override
-   public Client createClientWithConf(String authType) {
-     Configuration conf = new Configuration();
-     conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
-     UserGroupInformation.setConfiguration(conf);
-     return new Client();
-   }
-
-   @Override
-   public Server createServer(String keytabFile, String principalConf) throws TTransportException {
-     return new Server(keytabFile, principalConf);
-   }
-
-   /**
-    * Read and return Hadoop SASL configuration which can be configured using
-    * "hadoop.rpc.protection"
-    * @param conf
-    * @return Hadoop SASL configuration
-    */
-   @Override
-   public Map<String, String> getHadoopSaslProperties(Configuration conf) {
-     // Initialize the SaslRpcServer to ensure QOP parameters are read from conf
-     SaslRpcServer.init(conf);
-     return SaslRpcServer.SASL_PROPS;
-   }
-
-   public static class Client extends HadoopThriftAuthBridge.Client {
-     /**
-      * Create a client-side SASL transport that wraps an underlying transport.
-      *
-      * @param method The authentication method to use. Currently only KERBEROS is
-      *               supported.
-      * @param serverPrincipal The Kerberos principal of the target server.
-      * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
-      * @param saslProps the sasl properties to create the client with
-      */
-
-     @Override
-     public TTransport createClientTransport(
-       String principalConfig, String host,
-       String methodStr, String tokenStrForm, TTransport underlyingTransport,
-       Map<String, String> saslProps) throws IOException {
-       AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
-
-       TTransport saslTransport = null;
-       switch (method) {
-         case DIGEST:
-           Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
-           t.decodeFromUrlString(tokenStrForm);
-           saslTransport = new TSaslClientTransport(
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
+ */
+public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge {
+  static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
+
+  @Override
+  public Client createClient() {
+    return new Client();
+  }
+
+  @Override
+  public Client createClientWithConf(String authType) {
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
+    UserGroupInformation.setConfiguration(conf);
+    return new Client();
+  }
+
+  @Override
+  public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+    return new Server(keytabFile, principalConf);
+  }
+
+  @Override
+  public String getServerPrincipal(String principalConfig, String host)
+      throws IOException {
+    String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+    String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+    if (names.length != 3) {
+      throw new IOException(
+          "Kerberos principal name does NOT have the expected hostname part: "
+              + serverPrincipal);
+    }
+    return serverPrincipal;
+  }
+
+  @Override
+  public UserGroupInformation getCurrentUGIWithConf(String authType)
+      throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
+    UserGroupInformation.setConfiguration(conf);
+    return UserGroupInformation.getCurrentUser();
+  }
+
+  /**
+   * Read and return Hadoop SASL configuration which can be configured using
+   * "hadoop.rpc.protection"
+   * @param conf
+   * @return Hadoop SASL configuration
+   */
+  @Override
+  public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+    // Initialize the SaslRpcServer to ensure QOP parameters are read from conf
+    SaslRpcServer.init(conf);
+    return SaslRpcServer.SASL_PROPS;
+  }
+
+  public static class Client extends HadoopThriftAuthBridge.Client {
+    /**
+     * Create a client-side SASL transport that wraps an underlying transport.
+     *
+     * @param method The authentication method to use. Currently only KERBEROS is
+     *               supported.
+     * @param serverPrincipal The Kerberos principal of the target server.
+     * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+     * @param saslProps the sasl properties to create the client with
+     */
+
+    @Override
+    public TTransport createClientTransport(
+        String principalConfig, String host,
+        String methodStr, String tokenStrForm, TTransport underlyingTransport,
+        Map<String, String> saslProps) throws IOException {
+      AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+      TTransport saslTransport = null;
+      switch (method) {
+      case DIGEST:
+        Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+        t.decodeFromUrlString(tokenStrForm);
+        saslTransport = new TSaslClientTransport(
             method.getMechanismName(),
             null,
             null, SaslRpcServer.SASL_DEFAULT_REALM,
             saslProps, new SaslClientCallbackHandler(t),
             underlyingTransport);
-           return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+        return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+      case KERBEROS:
+        String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+        String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+        if (names.length != 3) {
+          throw new IOException(
+              "Kerberos principal name does NOT have the expected hostname part: "
+                  + serverPrincipal);
+        }
+        try {
+          saslTransport = new TSaslClientTransport(
+              method.getMechanismName(),
+              null,
+              names[0], names[1],
+              saslProps, null,
+              underlyingTransport);
+          return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+        } catch (SaslException se) {
+          throw new IOException("Could not instantiate SASL transport", se);
+        }
 
-         case KERBEROS:
-           String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
-           String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
-           if (names.length != 3) {
-             throw new IOException(
-               "Kerberos principal name does NOT have the expected hostname part: "
-                 + serverPrincipal);
-           }
-           try {
-             saslTransport = new TSaslClientTransport(
-               method.getMechanismName(),
-               null,
-               names[0], names[1],
-               saslProps, null,
-               underlyingTransport);
-             return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
-           } catch (SaslException se) {
-             throw new IOException("Could not instantiate SASL transport", se);
-           }
-
-         default:
-           throw new IOException("Unsupported authentication method: " + method);
-       }
-     }
+      default:
+        throw new IOException("Unsupported authentication method: " + method);
+      }
+    }
     private static class SaslClientCallbackHandler implements CallbackHandler {
       private final String userName;
       private final char[] userPassword;
@@ -168,8 +190,9 @@ import org.apache.thrift.transport.TTran
         this.userPassword = encodePassword(token.getPassword());
       }
 
+      @Override
       public void handle(Callback[] callbacks)
-      throws UnsupportedCallbackException {
+          throws UnsupportedCallbackException {
         NameCallback nc = null;
         PasswordCallback pc = null;
         RealmCallback rc = null;
@@ -214,253 +237,254 @@ import org.apache.thrift.transport.TTran
 
       static char[] encodePassword(byte[] password) {
         return new String(Base64.encodeBase64(password)).toCharArray();
-       }
-     }
-       }
-
-   public static class Server extends HadoopThriftAuthBridge.Server {
-     final UserGroupInformation realUgi;
-     DelegationTokenSecretManager secretManager;
-     private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
-     //Delegation token related keys
-     public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY =
-       "hive.cluster.delegation.key.update-interval";
-     public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
-       24*60*60*1000; // 1 day
-     public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
-       "hive.cluster.delegation.token.renew-interval";
-     public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
-       24*60*60*1000;  // 1 day
-     public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY =
-       "hive.cluster.delegation.token.max-lifetime";
-     public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
-       7*24*60*60*1000; // 7 days
-     public static final String DELEGATION_TOKEN_STORE_CLS =
-       "hive.cluster.delegation.token.store.class";
-     public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
-         "hive.cluster.delegation.token.store.zookeeper.connectString";
-     public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
-         "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
-     public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
-         "hive.cluster.delegation.token.store.zookeeper.znode";
-     public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
-             "hive.cluster.delegation.token.store.zookeeper.acl";
-     public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
-         "/hive/cluster/delegation";
-
-     public Server() throws TTransportException {
-       try {
-         realUgi = UserGroupInformation.getCurrentUser();
-       } catch (IOException ioe) {
-         throw new TTransportException(ioe);
-       }
-     }
-     /**
-      * Create a server with a kerberos keytab/principal.
-      */
-     protected Server(String keytabFile, String principalConf)
-       throws TTransportException {
-       if (keytabFile == null || keytabFile.isEmpty()) {
-         throw new TTransportException("No keytab specified");
-       }
-       if (principalConf == null || principalConf.isEmpty()) {
-         throw new TTransportException("No principal specified");
-       }
-
-       // Login from the keytab
-       String kerberosName;
-       try {
-         kerberosName =
-           SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
-         UserGroupInformation.loginUserFromKeytab(
-             kerberosName, keytabFile);
-         realUgi = UserGroupInformation.getLoginUser();
-         assert realUgi.isFromKeytab();
-       } catch (IOException ioe) {
-         throw new TTransportException(ioe);
-       }
-     }
-
-     /**
-      * Create a TTransportFactory that, upon connection of a client socket,
-      * negotiates a Kerberized SASL transport. The resulting TTransportFactory
-      * can be passed as both the input and output transport factory when
-      * instantiating a TThreadPoolServer, for example.
-      *
-      * @param saslProps Map of SASL properties
-      */
-     @Override
-     public TTransportFactory createTransportFactory(Map<String, String> saslProps)
-             throws TTransportException {
-       // Parse out the kerberos principal, host, realm.
-       String kerberosName = realUgi.getUserName();
-       final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
-       if (names.length != 3) {
-         throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
-       }
-
-       TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
-       transFactory.addServerDefinition(
-         AuthMethod.KERBEROS.getMechanismName(),
-         names[0], names[1],  // two parts of kerberos principal
-         saslProps,
-         new SaslRpcServer.SaslGssCallbackHandler());
-       transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+      }
+    }
+  }
+
+  public static class Server extends HadoopThriftAuthBridge.Server {
+    final UserGroupInformation realUgi;
+    DelegationTokenSecretManager secretManager;
+    private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+    //Delegation token related keys
+    public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+        "hive.cluster.delegation.key.update-interval";
+    public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+        24*60*60*1000; // 1 day
+    public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+        "hive.cluster.delegation.token.renew-interval";
+    public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+        24*60*60*1000;  // 1 day
+    public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+        "hive.cluster.delegation.token.max-lifetime";
+    public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+        7*24*60*60*1000; // 7 days
+    public static final String DELEGATION_TOKEN_STORE_CLS =
+        "hive.cluster.delegation.token.store.class";
+    public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+        "hive.cluster.delegation.token.store.zookeeper.connectString";
+    public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
+        "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
+    public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
+        "hive.cluster.delegation.token.store.zookeeper.znode";
+    public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+        "hive.cluster.delegation.token.store.zookeeper.acl";
+    public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
+        "/hive/cluster/delegation";
+
+    public Server() throws TTransportException {
+      try {
+        realUgi = UserGroupInformation.getCurrentUser();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+    /**
+     * Create a server with a kerberos keytab/principal.
+     */
+    protected Server(String keytabFile, String principalConf)
+        throws TTransportException {
+      if (keytabFile == null || keytabFile.isEmpty()) {
+        throw new TTransportException("No keytab specified");
+      }
+      if (principalConf == null || principalConf.isEmpty()) {
+        throw new TTransportException("No principal specified");
+      }
+
+      // Login from the keytab
+      String kerberosName;
+      try {
+        kerberosName =
+            SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+        UserGroupInformation.loginUserFromKeytab(
+            kerberosName, keytabFile);
+        realUgi = UserGroupInformation.getLoginUser();
+        assert realUgi.isFromKeytab();
+      } catch (IOException ioe) {
+        throw new TTransportException(ioe);
+      }
+    }
+
+    /**
+     * Create a TTransportFactory that, upon connection of a client socket,
+     * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+     * can be passed as both the input and output transport factory when
+     * instantiating a TThreadPoolServer, for example.
+     *
+     * @param saslProps Map of SASL properties
+     */
+    @Override
+    public TTransportFactory createTransportFactory(Map<String, String> saslProps)
+        throws TTransportException {
+      // Parse out the kerberos principal, host, realm.
+      String kerberosName = realUgi.getUserName();
+      final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+      if (names.length != 3) {
+        throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+      }
+
+      TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+      transFactory.addServerDefinition(
+          AuthMethod.KERBEROS.getMechanismName(),
+          names[0], names[1],  // two parts of kerberos principal
+          saslProps,
+          new SaslRpcServer.SaslGssCallbackHandler());
+      transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
           null, SaslRpcServer.SASL_DEFAULT_REALM,
           saslProps, new SaslDigestCallbackHandler(secretManager));
 
-       return new TUGIAssumingTransportFactory(transFactory, realUgi);
-     }
+      return new TUGIAssumingTransportFactory(transFactory, realUgi);
+    }
+
+    /**
+     * Wrap a TProcessor in such a way that, before processing any RPC, it
+     * assumes the UserGroupInformation of the user authenticated by
+     * the SASL transport.
+     */
+    @Override
+    public TProcessor wrapProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager, true);
+    }
 
-     /**
-      * Wrap a TProcessor in such a way that, before processing any RPC, it
-      * assumes the UserGroupInformation of the user authenticated by
-      * the SASL transport.
-      */
-     @Override
-     public TProcessor wrapProcessor(TProcessor processor) {
-       return new TUGIAssumingProcessor(processor, secretManager, true);
-     }
-
-     /**
-      * Wrap a TProcessor to capture the client information like connecting userid, ip etc
-      */
-     @Override
-     public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
+    /**
+     * Wrap a TProcessor to capture the client information like connecting userid, ip etc
+     */
+    @Override
+    public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
       return new TUGIAssumingProcessor(processor, secretManager, false);
-     }
+    }
 
     protected DelegationTokenStore getTokenStore(Configuration conf)
         throws IOException {
-       String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
-       if (StringUtils.isBlank(tokenStoreClassName)) {
-         return new MemoryTokenStore();
-       }
-       try {
+      String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+      if (StringUtils.isBlank(tokenStoreClassName)) {
+        return new MemoryTokenStore();
+      }
+      try {
         Class<? extends DelegationTokenStore> storeClass = Class
             .forName(tokenStoreClassName).asSubclass(
                 DelegationTokenStore.class);
         return ReflectionUtils.newInstance(storeClass, conf);
-       } catch (ClassNotFoundException e) {
+      } catch (ClassNotFoundException e) {
         throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
             e);
-       }
-     }
+      }
+    }
+
+    @Override
+    public void startDelegationTokenSecretManager(Configuration conf, Object hms)
+        throws IOException{
+      long secretKeyInterval =
+          conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+              DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+      long tokenMaxLifetime =
+          conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+              DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+      long tokenRenewInterval =
+          conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+              DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+
+      DelegationTokenStore dts = getTokenStore(conf);
+      dts.setStore(hms);
+      secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+          tokenMaxLifetime,
+          tokenRenewInterval,
+          DELEGATION_TOKEN_GC_INTERVAL, dts);
+      secretManager.startThreads();
+    }
+
+    @Override
+    public String getDelegationToken(final String owner, final String renewer)
+        throws IOException, InterruptedException {
+      if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+        throw new AuthorizationException(
+            "Delegation Token can be issued only with kerberos authentication. " +
+                "Current AuthenticationMethod: " + authenticationMethod.get()
+            );
+      }
+      //if the user asking the token is same as the 'owner' then don't do
+      //any proxy authorization checks. For cases like oozie, where it gets
+      //a delegation token for another user, we need to make sure oozie is
+      //authorized to get a delegation token.
+      //Do all checks on short names
+      UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+      UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+      if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+        //in the case of proxy users, the getCurrentUser will return the
+        //real user (for e.g. oozie) due to the doAs that happened just before the
+        //server started executing the method getDelegationToken in the MetaStore
+        ownerUgi = UserGroupInformation.createProxyUser(owner,
+            UserGroupInformation.getCurrentUser());
+        InetAddress remoteAddr = getRemoteAddress();
+        ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
+      }
+      return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+        @Override
+        public String run() throws IOException {
+          return secretManager.getDelegationToken(renewer);
+        }
+      });
+    }
+
+    @Override
+    public String getDelegationTokenWithService(String owner, String renewer, String service)
+        throws IOException, InterruptedException {
+      String token = getDelegationToken(owner, renewer);
+      return ShimLoader.getHadoopShims().addServiceToToken(token, service);
+    }
+
+    @Override
+    public long renewDelegationToken(String tokenStrForm) throws IOException {
+      if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+        throw new AuthorizationException(
+            "Delegation Token can be issued only with kerberos authentication. " +
+                "Current AuthenticationMethod: " + authenticationMethod.get()
+            );
+      }
+      return secretManager.renewDelegationToken(tokenStrForm);
+    }
+
+    @Override
+    public String getUserFromToken(String tokenStr) throws IOException {
+      return secretManager.getUserFromToken(tokenStr);
+    }
+
+    @Override
+    public void cancelDelegationToken(String tokenStrForm) throws IOException {
+      secretManager.cancelDelegationToken(tokenStrForm);
+    }
+
+    final static ThreadLocal<InetAddress> remoteAddress =
+        new ThreadLocal<InetAddress>() {
+      @Override
+      protected synchronized InetAddress initialValue() {
+        return null;
+      }
+    };
+
+    @Override
+    public InetAddress getRemoteAddress() {
+      return remoteAddress.get();
+    }
+
+    final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+        new ThreadLocal<AuthenticationMethod>() {
+      @Override
+      protected synchronized AuthenticationMethod initialValue() {
+        return AuthenticationMethod.TOKEN;
+      }
+    };
+
+    private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
+      @Override
+      protected synchronized String initialValue() {
+        return null;
+      }
+    };
 
-     @Override
-     public void startDelegationTokenSecretManager(Configuration conf, Object hms)
-     throws IOException{
-       long secretKeyInterval =
-         conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
-                        DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
-       long tokenMaxLifetime =
-           conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
-                        DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
-       long tokenRenewInterval =
-           conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
-                        DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
-
-       DelegationTokenStore dts = getTokenStore(conf);
-       dts.setStore(hms);
-       secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
-             tokenMaxLifetime,
-             tokenRenewInterval,
-             DELEGATION_TOKEN_GC_INTERVAL, dts);
-       secretManager.startThreads();
-     }
-
-     @Override
-     public String getDelegationToken(final String owner, final String renewer)
-     throws IOException, InterruptedException {
-       if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
-         throw new AuthorizationException(
-         "Delegation Token can be issued only with kerberos authentication. " +
-         "Current AuthenticationMethod: " + authenticationMethod.get()
-             );
-       }
-       //if the user asking the token is same as the 'owner' then don't do
-       //any proxy authorization checks. For cases like oozie, where it gets
-       //a delegation token for another user, we need to make sure oozie is
-       //authorized to get a delegation token.
-       //Do all checks on short names
-       UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
-       UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
-       if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
-         //in the case of proxy users, the getCurrentUser will return the
-         //real user (for e.g. oozie) due to the doAs that happened just before the
-         //server started executing the method getDelegationToken in the MetaStore
-         ownerUgi = UserGroupInformation.createProxyUser(owner,
-           UserGroupInformation.getCurrentUser());
-         InetAddress remoteAddr = getRemoteAddress();
-         ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
-       }
-       return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
-         public String run() throws IOException {
-           return secretManager.getDelegationToken(renewer);
-         }
-       });
-     }
-
-     @Override
-     public String getDelegationTokenWithService(String owner, String renewer, String service)
-         throws IOException, InterruptedException {
-       String token = getDelegationToken(owner, renewer);
-       return ShimLoader.getHadoopShims().addServiceToToken(token, service);
-     }
-
-     @Override
-     public long renewDelegationToken(String tokenStrForm) throws IOException {
-       if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
-         throw new AuthorizationException(
-         "Delegation Token can be issued only with kerberos authentication. " +
-         "Current AuthenticationMethod: " + authenticationMethod.get()
-             );
-       }
-       return secretManager.renewDelegationToken(tokenStrForm);
-     }
-
-     @Override
-     public String getUserFromToken(String tokenStr) throws IOException {
-       return secretManager.getUserFromToken(tokenStr);
-     }
-
-     @Override
-     public void cancelDelegationToken(String tokenStrForm) throws IOException {
-       secretManager.cancelDelegationToken(tokenStrForm);
-     }
-
-     final static ThreadLocal<InetAddress> remoteAddress =
-       new ThreadLocal<InetAddress>() {
-       @Override
-       protected synchronized InetAddress initialValue() {
-         return null;
-       }
-     };
-
-     @Override
-     public InetAddress getRemoteAddress() {
-       return remoteAddress.get();
-     }
-
-     final static ThreadLocal<AuthenticationMethod> authenticationMethod =
-       new ThreadLocal<AuthenticationMethod>() {
-       @Override
-       protected synchronized AuthenticationMethod initialValue() {
-         return AuthenticationMethod.TOKEN;
-       }
-     };
-
-     private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
-       @Override
-       protected synchronized String initialValue() {
-         return null;
-       }
-     };
-
-     @Override
-     public String getRemoteUser() {
-       return remoteUser.get();
-     }
+    @Override
+    public String getRemoteUser() {
+      return remoteUser.get();
+    }
 
     /** CallbackHandler for SASL DIGEST-MD5 mechanism */
     // This code is pretty much completely based on Hadoop's
@@ -501,12 +525,12 @@ import org.apache.thrift.transport.TTran
             continue; // realm is ignored
           } else {
             throw new UnsupportedCallbackException(callback,
-            "Unrecognized SASL DIGEST-MD5 Callback");
+                "Unrecognized SASL DIGEST-MD5 Callback");
           }
         }
         if (pc != null) {
           DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
-          getIdentifier(nc.getDefaultName(), secretManager);
+              getIdentifier(nc.getDefaultName(), secretManager);
           char[] password = getPassword(tokenIdentifier);
 
           if (LOG.isDebugEnabled()) {
@@ -526,7 +550,7 @@ import org.apache.thrift.transport.TTran
           if (ac.isAuthorized()) {
             if (LOG.isDebugEnabled()) {
               String username =
-                SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+                  SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
               LOG.debug("SASL server DIGEST-MD5 callback: setting "
                   + "canonicalized client ID: " + username);
             }
@@ -534,117 +558,120 @@ import org.apache.thrift.transport.TTran
           }
         }
       }
-     }
+    }
 
-     /**
-      * Processor that pulls the SaslServer object out of the transport, and
-      * assumes the remote user's UGI before calling through to the original
-      * processor.
-      *
-      * This is used on the server side to set the UGI for each specific call.
-      */
-     protected class TUGIAssumingProcessor implements TProcessor {
-       final TProcessor wrapped;
-       DelegationTokenSecretManager secretManager;
-       boolean useProxy;
-       TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
-           boolean useProxy) {
-         this.wrapped = wrapped;
-         this.secretManager = secretManager;
-         this.useProxy = useProxy;
-       }
-
-       public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
-         TTransport trans = inProt.getTransport();
-         if (!(trans instanceof TSaslServerTransport)) {
-           throw new TException("Unexpected non-SASL transport " + trans.getClass());
-         }
-         TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
-         SaslServer saslServer = saslTrans.getSaslServer();
-         String authId = saslServer.getAuthorizationID();
-         authenticationMethod.set(AuthenticationMethod.KERBEROS);
-         LOG.debug("AUTH ID ======>" + authId);
-         String endUser = authId;
-
-         if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
-           try {
-             TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
-                 secretManager);
-             endUser = tokenId.getUser().getUserName();
-             authenticationMethod.set(AuthenticationMethod.TOKEN);
-           } catch (InvalidToken e) {
-             throw new TException(e.getMessage());
-           }
-         }
-         Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
-         remoteAddress.set(socket.getInetAddress());
-         UserGroupInformation clientUgi = null;
-         try {
-           if (useProxy) {
-             clientUgi = UserGroupInformation.createProxyUser(
-               endUser, UserGroupInformation.getLoginUser());
-             remoteUser.set(clientUgi.getShortUserName());
-             return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
-                 public Boolean run() {
-                   try {
-                     return wrapped.process(inProt, outProt);
-                   } catch (TException te) {
-                     throw new RuntimeException(te);
-                   }
-                 }
-               });
-           } else {
-             remoteUser.set(endUser);
-             return wrapped.process(inProt, outProt);
-           }
-         } catch (RuntimeException rte) {
-           if (rte.getCause() instanceof TException) {
-             throw (TException)rte.getCause();
-           }
-           throw rte;
-         } catch (InterruptedException ie) {
-           throw new RuntimeException(ie); // unexpected!
-         } catch (IOException ioe) {
-           throw new RuntimeException(ioe); // unexpected!
-         }
-         finally {
-           if (clientUgi != null) {
-            try { FileSystem.closeAllForUGI(clientUgi); }
-              catch(IOException exception) {
-                LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+    /**
+     * Processor that pulls the SaslServer object out of the transport, and
+     * assumes the remote user's UGI before calling through to the original
+     * processor.
+     *
+     * This is used on the server side to set the UGI for each specific call.
+     */
+    protected class TUGIAssumingProcessor implements TProcessor {
+      final TProcessor wrapped;
+      DelegationTokenSecretManager secretManager;
+      boolean useProxy;
+      TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
+          boolean useProxy) {
+        this.wrapped = wrapped;
+        this.secretManager = secretManager;
+        this.useProxy = useProxy;
+      }
+
+      @Override
+      public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+        TTransport trans = inProt.getTransport();
+        if (!(trans instanceof TSaslServerTransport)) {
+          throw new TException("Unexpected non-SASL transport " + trans.getClass());
+        }
+        TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+        SaslServer saslServer = saslTrans.getSaslServer();
+        String authId = saslServer.getAuthorizationID();
+        authenticationMethod.set(AuthenticationMethod.KERBEROS);
+        LOG.debug("AUTH ID ======>" + authId);
+        String endUser = authId;
+
+        if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+          try {
+            TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+                secretManager);
+            endUser = tokenId.getUser().getUserName();
+            authenticationMethod.set(AuthenticationMethod.TOKEN);
+          } catch (InvalidToken e) {
+            throw new TException(e.getMessage());
+          }
+        }
+        Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+        remoteAddress.set(socket.getInetAddress());
+        UserGroupInformation clientUgi = null;
+        try {
+          if (useProxy) {
+            clientUgi = UserGroupInformation.createProxyUser(
+                endUser, UserGroupInformation.getLoginUser());
+            remoteUser.set(clientUgi.getShortUserName());
+            return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+              @Override
+              public Boolean run() {
+                try {
+                  return wrapped.process(inProt, outProt);
+                } catch (TException te) {
+                  throw new RuntimeException(te);
+                }
               }
+            });
+          } else {
+            remoteUser.set(endUser);
+            return wrapped.process(inProt, outProt);
+          }
+        } catch (RuntimeException rte) {
+          if (rte.getCause() instanceof TException) {
+            throw (TException)rte.getCause();
+          }
+          throw rte;
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie); // unexpected!
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe); // unexpected!
+        }
+        finally {
+          if (clientUgi != null) {
+            try { FileSystem.closeAllForUGI(clientUgi); }
+            catch(IOException exception) {
+              LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+            }
           }
-         }
-       }
-     }
+        }
+      }
+    }
 
     /**
-      * A TransportFactory that wraps another one, but assumes a specified UGI
-      * before calling through.
-      *
-      * This is used on the server side to assume the server's Principal when accepting
-      * clients.
-      */
-     static class TUGIAssumingTransportFactory extends TTransportFactory {
-       private final UserGroupInformation ugi;
-       private final TTransportFactory wrapped;
-
-       public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
-         assert wrapped != null;
-         assert ugi != null;
-
-         this.wrapped = wrapped;
-         this.ugi = ugi;
-       }
-
-       @Override
-       public TTransport getTransport(final TTransport trans) {
-         return ugi.doAs(new PrivilegedAction<TTransport>() {
-           public TTransport run() {
-             return wrapped.getTransport(trans);
-           }
-         });
-       }
-     }
-   }
- }
+     * A TransportFactory that wraps another one, but assumes a specified UGI
+     * before calling through.
+     *
+     * This is used on the server side to assume the server's Principal when accepting
+     * clients.
+     */
+    static class TUGIAssumingTransportFactory extends TTransportFactory {
+      private final UserGroupInformation ugi;
+      private final TTransportFactory wrapped;
+
+      public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+        assert wrapped != null;
+        assert ugi != null;
+
+        this.wrapped = wrapped;
+        this.ugi = ugi;
+      }
+
+      @Override
+      public TTransport getTransport(final TTransport trans) {
+        return ugi.doAs(new PrivilegedAction<TTransport>() {
+          @Override
+          public TTransport run() {
+            return wrapped.getTransport(trans);
+          }
+        });
+      }
+    }
+  }
+}

Modified: hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Mon Mar 17 13:04:29 2014
@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -80,15 +79,15 @@ public interface HadoopShims {
    *  @return TaskAttempt Log Url
    */
   String getTaskAttemptLogUrl(JobConf conf,
-    String taskTrackerHttpAddress,
-    String taskAttemptId)
-    throws MalformedURLException;
+      String taskTrackerHttpAddress,
+      String taskAttemptId)
+          throws MalformedURLException;
 
   /**
    * Returns a shim to wrap MiniMrCluster
    */
   public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers,
-                                     String nameNode, int numDir) throws IOException;
+      String nameNode, int numDir) throws IOException;
 
   /**
    * Shim for MiniMrCluster
@@ -125,7 +124,7 @@ public interface HadoopShims {
       String archiveName) throws Exception;
 
   public URI getHarUri(URI original, URI base, URI originalBase)
-        throws URISyntaxException;
+      throws URISyntaxException;
   /**
    * Hive uses side effect files exclusively for it's output. It also manages
    * the setup/cleanup/commit of output from the hive client. As a result it does
@@ -165,7 +164,7 @@ public interface HadoopShims {
    * @throws InterruptedException
    */
   public <T> T doAs(UserGroupInformation ugi, PrivilegedExceptionAction<T> pvea) throws
-    IOException, InterruptedException;
+  IOException, InterruptedException;
 
   /**
    * Once a delegation token is stored in a file, the location is specified
@@ -188,13 +187,13 @@ public interface HadoopShims {
 
 
   /**
-   * Used by metastore server to creates UGI object for a remote user.
+   * Used to creates UGI object for a remote user.
    * @param userName remote User Name
    * @param groupNames group names associated with remote user name
    * @return UGI created for the remote user.
    */
-
   public UserGroupInformation createRemoteUser(String userName, List<String> groupNames);
+
   /**
    * Get the short name corresponding to the subject in the passed UGI
    *
@@ -240,7 +239,7 @@ public interface HadoopShims {
    * @throws IOException
    */
   public void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService)
-    throws IOException;
+      throws IOException;
 
   /**
    * Add given service to the string format token
@@ -250,7 +249,7 @@ public interface HadoopShims {
    * @throws IOException
    */
   public String addServiceToToken(String tokenStr, String tokenService)
-    throws IOException;
+      throws IOException;
 
   enum JobTrackerState { INITIALIZING, RUNNING };
 
@@ -331,7 +330,7 @@ public interface HadoopShims {
    * @throws IOException
    */
   public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration conf)
-          throws IOException;
+      throws IOException;
 
   /**
    * Get the default block size for the path. FileSystem alone is not sufficient to
@@ -382,6 +381,7 @@ public interface HadoopShims {
   public interface InputSplitShim extends InputSplit {
     JobConf getJob();
 
+    @Override
     long getLength();
 
     /** Returns an array containing the startoffsets of the files in the split. */
@@ -406,14 +406,18 @@ public interface HadoopShims {
     Path[] getPaths();
 
     /** Returns all the Paths where this input-split resides. */
+    @Override
     String[] getLocations() throws IOException;
 
     void shrinkSplit(long length);
 
+    @Override
     String toString();
 
+    @Override
     void readFields(DataInput in) throws IOException;
 
+    @Override
     void write(DataOutput out) throws IOException;
   }
 
@@ -445,7 +449,7 @@ public interface HadoopShims {
    * @throws IOException
    */
   Iterator<FileStatus> listLocatedStatus(FileSystem fs, Path path,
-                                         PathFilter filter) throws IOException;
+      PathFilter filter) throws IOException;
 
   /**
    * For file status returned by listLocatedStatus, convert them into a list
@@ -456,7 +460,7 @@ public interface HadoopShims {
    * @throws IOException
    */
   BlockLocation[] getLocations(FileSystem fs,
-                               FileStatus status) throws IOException;
+      FileStatus status) throws IOException;
 
   public HCatHadoopShims getHCatShim();
   public interface HCatHadoopShims {
@@ -468,10 +472,10 @@ public interface HadoopShims {
     public TaskAttemptID createTaskAttemptID();
 
     public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf,
-                                                                                   TaskAttemptID taskId);
+        TaskAttemptID taskId);
 
     public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf,
-                                                                                org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable);
+        org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable);
 
     public JobContext createJobContext(Configuration conf, JobID jobId);
 
@@ -603,7 +607,7 @@ public interface HadoopShims {
   }
 
   public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec);
-  
+
   /**
    * Get configuration from JobContext
    */

Modified: hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Mon Mar 17 13:04:29 2014
@@ -16,39 +16,53 @@
  * limitations under the License.
  */
 
- package org.apache.hadoop.hive.thrift;
+package org.apache.hadoop.hive.thrift;
 
- import java.io.IOException;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
- /**
-  * This class is only overridden by the secure hadoop shim. It allows
-  * the Thrift SASL support to bridge to Hadoop's UserGroupInformation
-  * & DelegationToken infrastructure.
-  */
- public class HadoopThriftAuthBridge {
-   public Client createClient() {
-     throw new UnsupportedOperationException(
-       "The current version of Hadoop does not support Authentication");
-   }
-
-   public Client createClientWithConf(String authType) {
-     throw new UnsupportedOperationException(
-       "The current version of Hadoop does not support Authentication");
-   }
-
-   public Server createServer(String keytabFile, String principalConf)
-     throws TTransportException {
-     throw new UnsupportedOperationException(
-       "The current version of Hadoop does not support Authentication");
-   }
+/**
+ * This class is only overridden by the secure hadoop shim. It allows
+ * the Thrift SASL support to bridge to Hadoop's UserGroupInformation
+ * & DelegationToken infrastructure.
+ */
+public class HadoopThriftAuthBridge {
+  public Client createClient() {
+    throw new UnsupportedOperationException(
+        "The current version of Hadoop does not support Authentication");
+  }
+
+  public Client createClientWithConf(String authType) {
+    throw new UnsupportedOperationException(
+        "The current version of Hadoop does not support Authentication");
+  }
+
+  public UserGroupInformation getCurrentUGIWithConf(String authType)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "The current version of Hadoop does not support Authentication");
+  }
+
+
+  public String getServerPrincipal(String principalConfig, String host)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "The current version of Hadoop does not support Authentication");
+  }
+
+  public Server createServer(String keytabFile, String principalConf)
+      throws TTransportException {
+    throw new UnsupportedOperationException(
+        "The current version of Hadoop does not support Authentication");
+  }
 
 
   /**
@@ -58,47 +72,47 @@ import org.apache.thrift.transport.TTran
    * @param conf
    * @return Hadoop SASL configuration
    */
-   public Map<String, String> getHadoopSaslProperties(Configuration conf) {
-     throw new UnsupportedOperationException(
-       "The current version of Hadoop does not support Authentication");
-   }
-
-   public static abstract class Client {
-   /**
-    *
-    * @param principalConfig In the case of Kerberos authentication this will
-    * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
-    * authentication this will be null
-    * @param host The metastore server host name
-    * @param methodStr "KERBEROS" or "DIGEST"
-    * @param tokenStrForm This is url encoded string form of
-    * org.apache.hadoop.security.token.
-    * @param underlyingTransport the underlying transport
-    * @return the transport
-    * @throws IOException
-    */
-     public abstract TTransport createClientTransport(
-             String principalConfig, String host,
-             String methodStr, String tokenStrForm, TTransport underlyingTransport,
-             Map<String, String> saslProps)
-             throws IOException;
-   }
-
-   public static abstract class Server {
-     public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
-     public abstract TProcessor wrapProcessor(TProcessor processor);
-     public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
-     public abstract InetAddress getRemoteAddress();
-     public abstract void startDelegationTokenSecretManager(Configuration conf,
-       Object hmsHandler) throws IOException;
-     public abstract String getDelegationToken(String owner, String renewer)
-       throws IOException, InterruptedException;
-     public abstract String getDelegationTokenWithService(String owner, String renewer, String service)
-       throws IOException, InterruptedException;
-     public abstract String getRemoteUser();
-     public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
-     public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
-     public abstract String getUserFromToken(String tokenStr) throws IOException;
-   }
- }
+  public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+    throw new UnsupportedOperationException(
+        "The current version of Hadoop does not support Authentication");
+  }
+
+  public static abstract class Client {
+    /**
+     *
+     * @param principalConfig In the case of Kerberos authentication this will
+     * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
+     * authentication this will be null
+     * @param host The metastore server host name
+     * @param methodStr "KERBEROS" or "DIGEST"
+     * @param tokenStrForm This is url encoded string form of
+     * org.apache.hadoop.security.token.
+     * @param underlyingTransport the underlying transport
+     * @return the transport
+     * @throws IOException
+     */
+    public abstract TTransport createClientTransport(
+        String principalConfig, String host,
+        String methodStr, String tokenStrForm, TTransport underlyingTransport,
+        Map<String, String> saslProps)
+            throws IOException;
+  }
+
+  public static abstract class Server {
+    public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
+    public abstract TProcessor wrapProcessor(TProcessor processor);
+    public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
+    public abstract InetAddress getRemoteAddress();
+    public abstract void startDelegationTokenSecretManager(Configuration conf,
+        Object hmsHandler) throws IOException;
+    public abstract String getDelegationToken(String owner, String renewer)
+        throws IOException, InterruptedException;
+    public abstract String getDelegationTokenWithService(String owner, String renewer, String service)
+        throws IOException, InterruptedException;
+    public abstract String getRemoteUser();
+    public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
+    public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
+    public abstract String getUserFromToken(String tokenStr) throws IOException;
+  }
+}