You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/07 21:41:45 UTC

svn commit: r1637444 [20/20] - in /hive/branches/spark: ./ cli/src/test/org/apache/hadoop/hive/cli/ common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/conf/ co...

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java Fri Nov  7 20:41:34 2014
@@ -18,16 +18,18 @@
 
 package org.apache.hive.service.auth;
 
-import java.io.IOException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
 import java.security.PrivilegedExceptionAction;
 
+import javax.security.auth.Subject;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.http.protocol.BasicHttpContext;
 import org.apache.http.protocol.HttpContext;
 import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
 import org.ietf.jgss.GSSManager;
 import org.ietf.jgss.GSSName;
 import org.ietf.jgss.Oid;
@@ -36,60 +38,54 @@ import org.ietf.jgss.Oid;
  * Utility functions for HTTP mode authentication.
  */
 public final class HttpAuthUtils {
-
   public static final String WWW_AUTHENTICATE = "WWW-Authenticate";
   public static final String AUTHORIZATION = "Authorization";
   public static final String BASIC = "Basic";
   public static final String NEGOTIATE = "Negotiate";
-  
-  /**
-   * @return Stringified Base64 encoded kerberosAuthHeader on success
-   */
-  public static String getKerberosServiceTicket(String principal, String host, String serverHttpUrl)
-    throws IOException, InterruptedException {
-    UserGroupInformation clientUGI = getClientUGI("kerberos");
-    String serverPrincipal = getServerPrincipal(principal, host);
-    // Uses the Ticket Granting Ticket in the UserGroupInformation
-    return clientUGI.doAs(
-      new HttpKerberosClientAction(serverPrincipal, clientUGI.getUserName(), serverHttpUrl));
-  }
 
   /**
-   * Get server principal and verify that hostname is present.
-   */
-  private static String getServerPrincipal(String principal, String host) throws IOException {
-    return ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host);
-  }
-
-  /**
-   * JAAS login to setup the client UserGroupInformation.
-   * Sets up the Kerberos Ticket Granting Ticket,
-   * in the client UserGroupInformation object.
-   *
-   * @return Client's UserGroupInformation
+   * @return Stringified Base64 encoded kerberosAuthHeader on success
+   * @throws Exception
    */
-  public static UserGroupInformation getClientUGI(String authType) throws IOException {
-    return ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf(authType);
+  public static String getKerberosServiceTicket(String principal, String host,
+      String serverHttpUrl, boolean assumeSubject) throws Exception {
+    String serverPrincipal =
+        ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host);
+    if (assumeSubject) {
+      // With this option, we're assuming that the external application,
+      // using the JDBC driver has done a JAAS kerberos login already
+      AccessControlContext context = AccessController.getContext();
+      Subject subject = Subject.getSubject(context);
+      if (subject == null) {
+        throw new Exception("The Subject is not set");
+      }
+      return Subject.doAs(subject, new HttpKerberosClientAction(serverPrincipal, serverHttpUrl));
+    } else {
+      // JAAS login from ticket cache to setup the client UserGroupInformation
+      UserGroupInformation clientUGI =
+          ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf("kerberos");
+      return clientUGI.doAs(new HttpKerberosClientAction(serverPrincipal, serverHttpUrl));
+    }
   }
 
   private HttpAuthUtils() {
     throw new UnsupportedOperationException("Can't initialize class");
   }
 
+  /**
+   * We'll create an instance of this class within a doAs block so that the client's TGT credentials
+   * can be read from the Subject
+   */
   public static class HttpKerberosClientAction implements PrivilegedExceptionAction<String> {
-
     public static final String HTTP_RESPONSE = "HTTP_RESPONSE";
     public static final String SERVER_HTTP_URL = "SERVER_HTTP_URL";
     private final String serverPrincipal;
-    private final String clientUserName;
     private final String serverHttpUrl;
     private final Base64 base64codec;
     private final HttpContext httpContext;
 
-    public HttpKerberosClientAction(String serverPrincipal, String clientUserName,
-      String serverHttpUrl) {
+    public HttpKerberosClientAction(String serverPrincipal, String serverHttpUrl) {
       this.serverPrincipal = serverPrincipal;
-      this.clientUserName = clientUserName;
       this.serverHttpUrl = serverHttpUrl;
       base64codec = new Base64(0);
       httpContext = new BasicHttpContext();
@@ -102,38 +98,17 @@ public final class HttpAuthUtils {
       Oid mechOid = new Oid("1.2.840.113554.1.2.2");
       // Oid for kerberos principal name
       Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
-
       GSSManager manager = GSSManager.getInstance();
-
-      // GSS name for client
-      GSSName clientName = manager.createName(clientUserName, GSSName.NT_USER_NAME);
       // GSS name for server
       GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
-
-      // GSS credentials for client
-      GSSCredential clientCreds =
-        manager.createCredential(clientName, GSSCredential.DEFAULT_LIFETIME, mechOid,
-          GSSCredential.INITIATE_ONLY);
-
-      /*
-       * Create a GSSContext for mutual authentication with the
-       * server.
-       *    - serverName is the GSSName that represents the server.
-       *    - krb5Oid is the Oid that represents the mechanism to
-       *      use. The client chooses the mechanism to use.
-       *    - clientCreds are the client credentials
-       */
+      // Create a GSSContext for authentication with the service.
+      // We're passing client credentials as null since we want them to be read from the Subject.
       GSSContext gssContext =
-        manager.createContext(serverName, mechOid, clientCreds, GSSContext.DEFAULT_LIFETIME);
-
-      // Mutual authentication not r
+          manager.createContext(serverName, mechOid, null, GSSContext.DEFAULT_LIFETIME);
       gssContext.requestMutualAuth(false);
-
       // Establish context
       byte[] inToken = new byte[0];
-
       byte[] outToken = gssContext.initSecContext(inToken, 0, inToken.length);
-
       gssContext.dispose();
       // Base64 encoded and stringified token for server
       return new String(base64codec.encode(outToken));

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java Fri Nov  7 20:41:34 2014
@@ -128,6 +128,12 @@ public class OperationLog {
 
     void remove() {
       try {
+        if (in != null) {
+          in.close();
+        }
+        if (out != null) {
+          out.close();
+        }
         FileUtils.forceDelete(file);
         isRemoved = true;
       } catch (Exception e) {

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Fri Nov  7 20:41:34 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hive.service.cli.thrift;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -56,6 +58,10 @@ public class ThriftBinaryCLIService exte
       TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
       TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
       TServerSocket serverSocket = null;
+      List<String> sslVersionBlacklist = new ArrayList<String>();
+      for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
+        sslVersionBlacklist.add(sslVersion);
+      }
       if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
         serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
       } else {
@@ -67,13 +73,16 @@ public class ThriftBinaryCLIService exte
         String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
             HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
         serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
-            keyStorePassword);
+            keyStorePassword, sslVersionBlacklist);
       }
 
       // Server args
+      int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
       TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
           .processorFactory(processorFactory).transportFactory(transportFactory)
-          .protocolFactory(new TBinaryProtocol.Factory()).executorService(executorService);
+          .protocolFactory(new TBinaryProtocol.Factory())
+          .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize))
+          .executorService(executorService);
 
       // TCP Server
       server = new TThreadPoolServer(sargs);

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Fri Nov  7 20:41:34 2014
@@ -19,7 +19,8 @@
 package org.apache.hive.service.cli.thrift;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.TSetIpAddressProcessor;
 import org.apache.hive.service.cli.*;
@@ -53,7 +55,7 @@ public abstract class ThriftCLIService e
   protected static HiveAuthFactory hiveAuthFactory;
 
   protected int portNum;
-  protected InetSocketAddress serverAddress;
+  protected InetAddress serverAddress;
   protected String hiveHost;
   protected TServer server;
   protected org.eclipse.jetty.server.Server httpServer;
@@ -75,13 +77,21 @@ public abstract class ThriftCLIService e
   @Override
   public synchronized void init(HiveConf hiveConf) {
     this.hiveConf = hiveConf;
-
     // Initialize common server configs needed in both binary & http modes
     String portString;
     hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
     if (hiveHost == null) {
       hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
     }
+    try {
+      if (hiveHost != null && !hiveHost.isEmpty()) {
+        serverAddress = InetAddress.getByName(hiveHost);
+      } else {
+        serverAddress = InetAddress.getLocalHost();
+      }
+    } catch (UnknownHostException e) {
+      throw new ServiceException(e);
+    }
     // HTTP mode
     if (HiveServer2.isHTTPTransportMode(hiveConf)) {
       workerKeepAliveTime =
@@ -105,11 +115,6 @@ public abstract class ThriftCLIService e
         portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
       }
     }
-    if (hiveHost != null && !hiveHost.isEmpty()) {
-      serverAddress = new InetSocketAddress(hiveHost, portNum);
-    } else {
-      serverAddress = new InetSocketAddress(portNum);
-    }
     minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
     maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
     super.init(hiveConf);
@@ -148,7 +153,7 @@ public abstract class ThriftCLIService e
     return portNum;
   }
 
-  public InetSocketAddress getServerAddress() {
+  public InetAddress getServerAddress() {
     return serverAddress;
   }
 

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Fri Nov  7 20:41:34 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hive.service.cli.thrift;
 
+import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -83,6 +84,11 @@ public class ThriftHttpCLIService extend
               + " Not configured for SSL connection");
         }
         SslContextFactory sslContextFactory = new SslContextFactory();
+        String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
+        LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols));
+        sslContextFactory.addExcludeProtocols(excludedProtocols);
+        LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
+          Arrays.toString(sslContextFactory.getExcludeProtocols()));
         sslContextFactory.setKeyStorePath(keyStorePath);
         sslContextFactory.setKeyStorePassword(keyStorePassword);
         connector = new SslSelectChannelConnector(sslContextFactory);

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java Fri Nov  7 20:41:34 2014
@@ -208,11 +208,9 @@ public class ThriftHttpServlet extends T
 
         // Create a GSS context
         gssContext = manager.createContext(serverCreds);
-
         // Get service ticket from the authorization header
         String serviceTicketBase64 = getAuthHeader(request, authType);
         byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
-
         gssContext.acceptSecContext(inToken, 0, inToken.length);
         // Authenticate or deny based on its context completion
         if (!gssContext.isEstablished()) {

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Fri Nov  7 20:41:34 2014
@@ -32,6 +32,10 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -53,7 +57,6 @@ import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
@@ -66,7 +69,7 @@ public class HiveServer2 extends Composi
   private CLIService cliService;
   private ThriftCLIService thriftCLIService;
   private String znodePath;
-  private ZooKeeper zooKeeperClient;
+  private CuratorFramework zooKeeperClient;
   private boolean registeredWithZooKeeper = false;
 
   public HiveServer2() {
@@ -74,7 +77,6 @@ public class HiveServer2 extends Composi
     HiveConf.setLoadHiveServer2Config(true);
   }
 
-
   @Override
   public synchronized void init(HiveConf hiveConf) {
     cliService = new CLIService(this);
@@ -109,36 +111,60 @@ public class HiveServer2 extends Composi
   }
 
   /**
+   * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
+   */
+  private final ACLProvider zooKeeperAclProvider = new ACLProvider() {
+    List<ACL> nodeAcls = new ArrayList<ACL>();
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+        // Read all to the world
+        nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
+        // Create/Delete/Write/Admin to the authenticated user
+        nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
+      } else {
+        // ACLs for znodes on a non-kerberized cluster
+        // Create/Read/Delete/Write/Admin to the world
+        nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
+      }
+      return nodeAcls;
+    }
+
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return getDefaultAcl();
+    }
+  };
+
+  /**
    * Adds a server instance to ZooKeeper as a znode.
    *
    * @param hiveConf
    * @throws Exception
    */
   private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
-    int zooKeeperSessionTimeout =
-        hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
     String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
     String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
     String instanceURI = getServerInstanceURI(hiveConf);
     byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
-    // Znode ACLs
-    List<ACL> nodeAcls = new ArrayList<ACL>();
-    setUpAuthAndAcls(hiveConf, nodeAcls);
-    // Create a ZooKeeper client
+    setUpZooKeeperAuth(hiveConf);
+    // Create a CuratorFramework instance to be used as the ZooKeeper client
+    // Use the zooKeeperAclProvider to create appropriate ACLs
     zooKeeperClient =
-        new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
-            new ZooKeeperHiveHelper.DummyWatcher());
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .aclProvider(zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(1000, 3))
+            .build();
+    zooKeeperClient.start();
     // Create the parent znodes recursively; ignore if the parent already exists.
-    // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs,
-    // as explained in {@link #setUpAuthAndAcls(HiveConf, List<ACL>) setUpAuthAndAcls}
     try {
-      ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls,
-          CreateMode.PERSISTENT);
+      zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+          .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
       LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
     } catch (KeeperException e) {
       if (e.code() != KeeperException.Code.NODEEXISTS) {
         LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
-        throw (e);
+        throw e;
       }
     }
     // Create a znode under the rootNamespace parent for this instance of the server
@@ -149,56 +175,40 @@ public class HiveServer2 extends Composi
               + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
               + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
       znodePath =
-          zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls,
-              CreateMode.EPHEMERAL_SEQUENTIAL);
+          zooKeeperClient.create().creatingParentsIfNeeded()
+              .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8);
       setRegisteredWithZooKeeper(true);
       // Set a watch on the znode
-      if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+      if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
         // No node exists, throw exception
         throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
       }
       LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
     } catch (KeeperException e) {
       LOG.fatal("Unable to create a znode for this server instance", e);
-      throw new Exception(e);
+      throw (e);
     }
   }
 
   /**
-   * Set up ACLs for znodes based on whether the cluster is secure or not.
-   * On a kerberized cluster, ZooKeeper performs Kerberos-SASL authentication.
-   * We give Read privilege to the world, but Create/Delete/Write/Admin to the authenticated user.
-   * On a non-kerberized cluster, we give Create/Read/Delete/Write/Admin privileges to the world.
+   * For a kerberized cluster, we dynamically set up the client's JAAS conf.
    *
-   * For a kerberized cluster, we also dynamically set up the client's JAAS conf.
    * @param hiveConf
-   * @param nodeAcls
    * @return
    * @throws Exception
    */
-  private void setUpAuthAndAcls(HiveConf hiveConf, List<ACL> nodeAcls) throws Exception {
+  private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
     if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
       String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
       if (principal.isEmpty()) {
-        throw new IOException(
-            "HiveServer2 Kerberos principal is empty");
+        throw new IOException("HiveServer2 Kerberos principal is empty");
       }
       String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
       if (keyTabFile.isEmpty()) {
-        throw new IOException(
-            "HiveServer2 Kerberos keytab is empty");
+        throw new IOException("HiveServer2 Kerberos keytab is empty");
       }
-
       // Install the JAAS Configuration for the runtime
       ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
-      // Read all to the world
-      nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
-      // Create/Delete/Write/Admin to the authenticated user
-      nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
-    } else {
-      // ACLs for znodes on a non-kerberized cluster
-      // Create/Read/Delete/Write/Admin to the world
-      nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
     }
   }
 
@@ -243,7 +253,7 @@ public class HiveServer2 extends Composi
     if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) {
       throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
     }
-    return thriftCLIService.getServerAddress().getHostName() + ":"
+    return thriftCLIService.getServerAddress().getHostAddress() + ":"
         + thriftCLIService.getPortNumber();
   }
 
@@ -344,24 +354,25 @@ public class HiveServer2 extends Composi
    */
   static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
     HiveConf hiveConf = new HiveConf();
-    int zooKeeperSessionTimeout =
-        hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
     String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
     String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
-    ZooKeeper zooKeeperClient =
-        new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
-            new ZooKeeperHiveHelper.DummyWatcher());
-    // Get all znode paths
+    CuratorFramework zooKeeperClient =
+        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+    zooKeeperClient.start();
     List<String> znodePaths =
-        zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace,
-            false);
+        zooKeeperClient.getChildren().forPath(
+            ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
     // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
     for (String znodePath : znodePaths) {
       if (znodePath.contains("version=" + versionNumber + ";")) {
-        zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
-            + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1);
+        LOG.info("Removing the znode: " + znodePath + " from ZooKeeper");
+        zooKeeperClient.delete().forPath(
+            ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+                + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath);
       }
     }
+    zooKeeperClient.close();
   }
 
   public static void main(String[] args) {
@@ -516,8 +527,8 @@ public class HiveServer2 extends Composi
   }
 
   /**
-   * DeregisterOptionExecutor: executes the --deregister option by
-   * deregistering all HiveServer2 instances from ZooKeeper of a specific version.
+   * DeregisterOptionExecutor: executes the --deregister option by deregistering all HiveServer2
+   * instances from ZooKeeper of a specific version.
    */
   static class DeregisterOptionExecutor implements ServerOptionsExecutor {
     private final String versionNumber;
@@ -539,4 +550,3 @@ public class HiveServer2 extends Composi
     }
   }
 }
-

Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Fri Nov  7 20:41:34 2014
@@ -76,9 +76,6 @@ import org.apache.hadoop.security.authen
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
 import org.apache.tez.test.MiniTezCluster;
 
 import com.google.common.base.Joiner;
@@ -90,7 +87,6 @@ import com.google.common.collect.Iterabl
  * Implemention of shims against Hadoop 0.23.0.
  */
 public class Hadoop23Shims extends HadoopShimsSecure {
-  private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
 
   HadoopShims.MiniDFSShim cluster = null;
 
@@ -230,22 +226,13 @@ public class Hadoop23Shims extends Hadoo
    */
   @Override
   public void refreshDefaultQueue(Configuration conf, String userName) throws IOException {
-    String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
     if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) {
-      AllocationConfiguration allocConf = new AllocationConfiguration(conf);
-      QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy();
-      if (queuePolicy != null) {
-        requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
-        if (StringUtils.isNotBlank(requestedQueue)) {
-          LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName);
-          conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
-        }
-      }
+      ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, userName);
     }
   }
 
   private boolean isFairScheduler (Configuration conf) {
-    return FairScheduler.class.getName().
+    return "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler".
         equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
   }
 

Modified: hive/branches/spark/shims/aggregator/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/aggregator/pom.xml?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/aggregator/pom.xml (original)
+++ hive/branches/spark/shims/aggregator/pom.xml Fri Nov  7 20:41:34 2014
@@ -63,5 +63,11 @@
       <version>${project.version}</version>
       <scope>runtime</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive.shims</groupId>
+      <artifactId>hive-shims-scheduler</artifactId>
+      <version>${project.version}</version>
+      <scope>runtime</scope>
+    </dependency>
   </dependencies>
 </project>

Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Fri Nov  7 20:41:34 2014
@@ -463,6 +463,16 @@ public abstract class HadoopShimsSecure 
 
   @Override
   public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
+    String doAs = System.getenv("HADOOP_USER_NAME");
+    if(doAs != null && doAs.length() > 0) {
+     /*
+      * this allows doAs (proxy user) to be passed along across process boundary where
+      * delegation tokens are not supported.  For example, a DDL stmt via WebHCat with
+      * a doAs parameter, forks to 'hcat' which needs to start a Session that
+      * proxies the end user
+      */
+      return UserGroupInformation.createProxyUser(doAs, UserGroupInformation.getLoginUser());
+    }
     return UserGroupInformation.getCurrentUser();
   }
 

Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Fri Nov  7 20:41:34 2014
@@ -62,8 +62,7 @@ public class ZooKeeperTokenStore impleme
   private String rootNode = "";
   private volatile CuratorFramework zkSession;
   private String zkConnectString;
-  private final int zkSessionTimeout = 3000;
-  private int connectTimeoutMillis = -1;
+  private int connectTimeoutMillis;
   private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS));
 
   /**
@@ -101,10 +100,10 @@ public class ZooKeeperTokenStore impleme
     if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
       synchronized (this) {
         if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
-          zkSession = CuratorFrameworkFactory.builder().connectString(zkConnectString)
-              .sessionTimeoutMs(zkSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
-              .aclProvider(aclDefaultProvider)
-              .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+          zkSession =
+              CuratorFrameworkFactory.builder().connectString(zkConnectString)
+                  .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
+                  .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
           zkSession.start();
         }
       }
@@ -431,12 +430,14 @@ public class ZooKeeperTokenStore impleme
   @Override
   public void init(Object objectStore, ServerMode smode) {
     this.serverMode = smode;
-    zkConnectString = conf.get(
-        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+    zkConnectString =
+        conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
     if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
       // try alternate config param
-      zkConnectString = conf.get(
-          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null);
+      zkConnectString =
+          conf.get(
+              HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
+              null);
       if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
         throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
             + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
@@ -445,14 +446,17 @@ public class ZooKeeperTokenStore impleme
             + WHEN_ZK_DSTORE_MSG);
       }
     }
-    connectTimeoutMillis = conf.getInt(
-        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
+    connectTimeoutMillis =
+        conf.getInt(
+            HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+            CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
     String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
     if (StringUtils.isNotBlank(aclStr)) {
       this.newNodeAcl = parseACLs(aclStr);
     }
-    rootNode = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
-        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
+    rootNode =
+        conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+            HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
 
     try {
       // Install the JAAS Configuration for the runtime

Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java Fri Nov  7 20:41:34 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.shims;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.AppenderSkeleton;
@@ -33,6 +34,7 @@ public abstract class ShimLoader {
   private static JettyShims jettyShims;
   private static AppenderSkeleton eventCounter;
   private static HadoopThriftAuthBridge hadoopThriftAuthBridge;
+  private static SchedulerShim schedulerShim;
 
   /**
    * The names of the classes for shimming Hadoop for each major version.
@@ -87,6 +89,9 @@ public abstract class ShimLoader {
   }
 
 
+  private static final String SCHEDULER_SHIM_CLASSE =
+    "org.apache.hadoop.hive.schshim.FairSchedulerShim";
+
   /**
    * Factory method to get an instance of HadoopShims based on the
    * version of Hadoop on the classpath.
@@ -124,6 +129,13 @@ public abstract class ShimLoader {
     return hadoopThriftAuthBridge;
   }
 
+  public static synchronized SchedulerShim getSchedulerShims() {
+    if (schedulerShim == null) {
+      schedulerShim = createShim(SCHEDULER_SHIM_CLASSE, SchedulerShim.class);
+    }
+    return schedulerShim;
+  }
+
   private static <T> T loadShims(Map<String, String> classMap, Class<T> xface) {
     String vers = getMajorVersion();
     String className = classMap.get(vers);

Modified: hive/branches/spark/shims/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/pom.xml?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/pom.xml (original)
+++ hive/branches/spark/shims/pom.xml Fri Nov  7 20:41:34 2014
@@ -37,6 +37,7 @@
     <module>common-secure</module>
     <module>0.20S</module>
     <module>0.23</module>
+    <module>scheduler</module>
     <module>aggregator</module>
   </modules>
 </project>

Modified: hive/branches/spark/shims/scheduler/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/scheduler/pom.xml?rev=1637444&r1=1636884&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/scheduler/pom.xml (original)
+++ hive/branches/spark/shims/scheduler/pom.xml Fri Nov  7 20:41:34 2014
@@ -80,6 +80,7 @@
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
      <version>${hadoop-23.version}</version>
+     <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>