You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC

svn commit: r1635536 [28/28] - in /hive/branches/spark: ./ accumulo-handler/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hado...

Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Thu Oct 30 16:22:33 2014
@@ -20,24 +20,28 @@ package org.apache.hadoop.hive.thrift;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
@@ -56,26 +60,35 @@ public class ZooKeeperTokenStore impleme
   private static final String NODE_TOKENS = "/tokens";
 
   private String rootNode = "";
-  private volatile ZooKeeper zkSession;
+  private volatile CuratorFramework zkSession;
   private String zkConnectString;
   private final int zkSessionTimeout = 3000;
-  private long connectTimeoutMillis = -1;
-  private List<ACL> newNodeAcl = Ids.OPEN_ACL_UNSAFE;
+  private int connectTimeoutMillis = -1;
+  private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS));
 
-  private class ZooKeeperWatcher implements Watcher {
-    public void process(org.apache.zookeeper.WatchedEvent event) {
-      LOGGER.info(event.toString());
-      if (event.getState() == Watcher.Event.KeeperState.Expired) {
-        LOGGER.warn("ZooKeeper session expired, discarding connection");
-        try {
-          zkSession.close();
-        } catch (Throwable e) {
-          LOGGER.warn("Failed to close connection on expired session", e);
-        }
-      }
+  /**
+   * ACLProvider permissions will be used in case parent dirs need to be created
+   */
+  private final ACLProvider aclDefaultProvider =  new ACLProvider() {
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      return newNodeAcl;
     }
 
-  }
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return getDefaultAcl();
+    }
+  };
+
+
+  private ServerMode serverMode;
+
+  private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+      + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+
+  private Configuration conf;
 
   /**
    * Default constructor for dynamic instantiation w/ Configurable
@@ -84,93 +97,74 @@ public class ZooKeeperTokenStore impleme
   protected ZooKeeperTokenStore() {
   }
 
-  public ZooKeeperTokenStore(String hostPort) {
-    this.zkConnectString = hostPort;
-    init();
-  }
-
-  private ZooKeeper getSession() {
-    if (zkSession == null || zkSession.getState() == States.CLOSED) {
-        synchronized (this) {
-          if (zkSession == null || zkSession.getState() == States.CLOSED) {
-            try {
-              zkSession = createConnectedClient(this.zkConnectString, this.zkSessionTimeout,
-                this.connectTimeoutMillis, new ZooKeeperWatcher());
-            } catch (IOException ex) {
-              throw new TokenStoreException("Token store error.", ex);
-            }
-          }
+  private CuratorFramework getSession() {
+    if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+      synchronized (this) {
+        if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+          zkSession = CuratorFrameworkFactory.builder().connectString(zkConnectString)
+              .sessionTimeoutMs(zkSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
+              .aclProvider(aclDefaultProvider)
+              .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+          zkSession.start();
         }
+      }
     }
     return zkSession;
   }
 
-  /**
-   * Create a ZooKeeper session that is in connected state.
-   *
-   * @param connectString ZooKeeper connect String
-   * @param sessionTimeout ZooKeeper session timeout
-   * @param connectTimeout milliseconds to wait for connection, 0 or negative value means no wait
-   * @param watchers
-   * @return
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  public static ZooKeeper createConnectedClient(String connectString,
-      int sessionTimeout, long connectTimeout, final Watcher... watchers)
-      throws IOException {
-    final CountDownLatch connected = new CountDownLatch(1);
-    Watcher connectWatcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        switch (event.getState()) {
-        case SyncConnected:
-          connected.countDown();
-          break;
-        }
-        for (Watcher w : watchers) {
-          w.process(event);
-        }
-      }
-    };
-    ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, connectWatcher);
-    if (connectTimeout > 0) {
-      try {
-        if (!connected.await(connectTimeout, TimeUnit.MILLISECONDS)) {
-          zk.close();
-          throw new IOException("Timeout waiting for connection after "
-              + connectTimeout + "ms");
-        }
-      } catch (InterruptedException e) {
-        throw new IOException("Error waiting for connection.", e);
-      }
+  private void setupJAASConfig(Configuration conf) throws IOException {
+    if (!UserGroupInformation.getLoginUser().isFromKeytab()) {
+      // The process has not logged in using keytab
+      // this should be a test mode, can't use keytab to authenticate
+      // with zookeeper.
+      LOGGER.warn("Login is not from keytab");
+      return;
+    }
+
+    String principal;
+    String keytab;
+    switch (serverMode) {
+    case METASTORE:
+      principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal");
+      keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file");
+      break;
+    case HIVESERVER2:
+      principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal");
+      keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab");
+      break;
+    default:
+      throw new AssertionError("Unexpected server mode " + serverMode);
+    }
+    ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keytab);
+  }
+
+  private String getNonEmptyConfVar(Configuration conf, String param) throws IOException {
+    String val = conf.get(param);
+    if (val == null || val.trim().isEmpty()) {
+      throw new IOException("Configuration parameter " + param + " should be set, "
+          + WHEN_ZK_DSTORE_MSG);
     }
-    return zk;
+    return val;
   }
 
   /**
    * Create a path if it does not already exist ("mkdir -p")
-   * @param zk ZooKeeper session
    * @param path string with '/' separator
    * @param acl list of ACL entries
-   * @return
-   * @throws KeeperException
-   * @throws InterruptedException
+   * @throws TokenStoreException
    */
-  public static String ensurePath(ZooKeeper zk, String path, List<ACL> acl) throws KeeperException,
-      InterruptedException {
-    String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
-    String currentPath = "";
-    for (String pathComp : pathComps) {
-      currentPath += "/" + pathComp;
-      try {
-        String node = zk.create(currentPath, new byte[0], acl,
-            CreateMode.PERSISTENT);
-        LOGGER.info("Created path: " + node);
-      } catch (KeeperException.NodeExistsException e) {
-      }
+  public void ensurePath(String path, List<ACL> acl)
+      throws TokenStoreException {
+    try {
+      CuratorFramework zk = getSession();
+      String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+          .withACL(acl).forPath(path);
+      LOGGER.info("Created path: {} ", node);
+    } catch (KeeperException.NodeExistsException e) {
+      // node already exists
+    } catch (Exception e) {
+      throw new TokenStoreException("Error creating path " + path, e);
     }
-    return currentPath;
   }
 
   /**
@@ -234,45 +228,24 @@ public class ZooKeeperTokenStore impleme
     return acl;
   }
 
-  private void init() {
-    if (this.zkConnectString == null) {
-      throw new IllegalStateException("Not initialized");
-    }
-
+  private void initClientAndPaths() {
     if (this.zkSession != null) {
-      try {
-        this.zkSession.close();
-      } catch (InterruptedException ex) {
-        LOGGER.warn("Failed to close existing session.", ex);
-      }
+      this.zkSession.close();
     }
-    ZooKeeper zk = getSession();
-
     try {
-        ensurePath(zk, rootNode + NODE_KEYS, newNodeAcl);
-        ensurePath(zk, rootNode + NODE_TOKENS, newNodeAcl);
-      } catch (Exception e) {
-        throw new TokenStoreException("Failed to validate token path.", e);
-      }
+      ensurePath(rootNode + NODE_KEYS, newNodeAcl);
+      ensurePath(rootNode + NODE_TOKENS, newNodeAcl);
+    } catch (TokenStoreException e) {
+      throw e;
+    }
   }
 
   @Override
   public void setConf(Configuration conf) {
     if (conf == null) {
-       throw new IllegalArgumentException("conf is null");
+      throw new IllegalArgumentException("conf is null");
     }
-    this.zkConnectString = conf.get(
-      HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
-    this.connectTimeoutMillis = conf.getLong(
-      HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
-    this.rootNode = conf.get(
-      HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
-      HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT);
-    String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
-    if (StringUtils.isNotBlank(csv)) {
-       this.newNodeAcl = parseACLs(csv);
-    }
-    init();
+    this.conf = conf;
   }
 
   @Override
@@ -280,15 +253,18 @@ public class ZooKeeperTokenStore impleme
     return null; // not required
   }
 
-  private Map<Integer, byte[]> getAllKeys() throws KeeperException,
-      InterruptedException {
+  private Map<Integer, byte[]> getAllKeys() throws KeeperException, InterruptedException {
 
     String masterKeyNode = rootNode + NODE_KEYS;
-    ZooKeeper zk = getSession();
-    List<String> nodes = zk.getChildren(masterKeyNode, false);
+
+    // get children of key node
+    List<String> nodes = zkGetChildren(masterKeyNode);
+
+    // read each child node, add to results
     Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
     for (String node : nodes) {
-      byte[] data = zk.getData(masterKeyNode + "/" + node, false, null);
+      String nodePath = masterKeyNode + "/" + node;
+      byte[] data = zkGetData(nodePath);
       if (data != null) {
         result.put(getSeq(node), data);
       }
@@ -296,6 +272,26 @@ public class ZooKeeperTokenStore impleme
     return result;
   }
 
+  private List<String> zkGetChildren(String path) {
+    CuratorFramework zk = getSession();
+    try {
+      return zk.getChildren().forPath(path);
+    } catch (Exception e) {
+      throw new TokenStoreException("Error getting children for " + path, e);
+    }
+  }
+
+  private byte[] zkGetData(String nodePath) {
+    CuratorFramework zk = getSession();
+    try {
+      return zk.getData().forPath(nodePath);
+    } catch (KeeperException.NoNodeException ex) {
+      return null;
+    } catch (Exception e) {
+      throw new TokenStoreException("Error reading " + nodePath, e);
+    }
+  }
+
   private int getSeq(String path) {
     String[] pathComps = path.split("/");
     return Integer.parseInt(pathComps[pathComps.length-1]);
@@ -303,44 +299,45 @@ public class ZooKeeperTokenStore impleme
 
   @Override
   public int addMasterKey(String s) {
+    String keysPath = rootNode + NODE_KEYS + "/";
+    CuratorFramework zk = getSession();
+    String newNode;
     try {
-      ZooKeeper zk = getSession();
-      String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), newNodeAcl,
-          CreateMode.PERSISTENT_SEQUENTIAL);
-      LOGGER.info("Added key {}", newNode);
-      return getSeq(newNode);
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
+      newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl)
+          .forPath(keysPath, s.getBytes());
+    } catch (Exception e) {
+      throw new TokenStoreException("Error creating new node with path " + keysPath, e);
     }
+    LOGGER.info("Added key {}", newNode);
+    return getSeq(newNode);
   }
 
   @Override
   public void updateMasterKey(int keySeq, String s) {
+    CuratorFramework zk = getSession();
+    String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
     try {
-      ZooKeeper zk = getSession();
-      zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
-          -1);
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
+      zk.setData().forPath(keyPath, s.getBytes());
+    } catch (Exception e) {
+      throw new TokenStoreException("Error setting data in " + keyPath, e);
     }
   }
 
   @Override
   public boolean removeMasterKey(int keySeq) {
+    String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
+    zkDelete(keyPath);
+    return true;
+  }
+
+  private void zkDelete(String path) {
+    CuratorFramework zk = getSession();
     try {
-      ZooKeeper zk = getSession();
-      zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1);
-      return true;
+      zk.delete().forPath(path);
     } catch (KeeperException.NoNodeException ex) {
-      return false;
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
+      // already deleted
+    } catch (Exception e) {
+      throw new TokenStoreException("Error deleting " + path, e);
     }
   }
 
@@ -374,67 +371,42 @@ public class ZooKeeperTokenStore impleme
   @Override
   public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
       DelegationTokenInformation token) {
+    byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
+    String tokenPath = getTokenPath(tokenIdentifier);
+    CuratorFramework zk = getSession();
+    String newNode;
     try {
-      ZooKeeper zk = getSession();
-      byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
-      String newNode = zk.create(getTokenPath(tokenIdentifier),
-          tokenBytes, newNodeAcl, CreateMode.PERSISTENT);
-      LOGGER.info("Added token: {}", newNode);
-      return true;
-    } catch (KeeperException.NodeExistsException ex) {
-      return false;
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
+      newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl)
+          .forPath(tokenPath, tokenBytes);
+    } catch (Exception e) {
+      throw new TokenStoreException("Error creating new node with path " + tokenPath, e);
     }
+
+    LOGGER.info("Added token: {}", newNode);
+    return true;
   }
 
   @Override
   public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
-    try {
-      ZooKeeper zk = getSession();
-      zk.delete(getTokenPath(tokenIdentifier), -1);
-      return true;
-    } catch (KeeperException.NoNodeException ex) {
-      return false;
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
-    }
+    String tokenPath = getTokenPath(tokenIdentifier);
+    zkDelete(tokenPath);
+    return true;
   }
 
   @Override
   public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+    byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier));
     try {
-      ZooKeeper zk = getSession();
-      byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null);
-      try {
-        return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
-      } catch (Exception ex) {
-        throw new TokenStoreException("Failed to decode token", ex);
-      }
-    } catch (KeeperException.NoNodeException ex) {
-      return null;
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
+      return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+    } catch (Exception ex) {
+      throw new TokenStoreException("Failed to decode token", ex);
     }
   }
 
   @Override
   public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
     String containerNode = rootNode + NODE_TOKENS;
-    final List<String> nodes;
-    try  {
-      nodes = getSession().getChildren(containerNode, false);
-    } catch (KeeperException ex) {
-      throw new TokenStoreException(ex);
-    } catch (InterruptedException ex) {
-      throw new TokenStoreException(ex);
-    }
+    final List<String> nodes = zkGetChildren(containerNode);
     List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
         nodes.size());
     for (String node : nodes) {
@@ -452,17 +424,44 @@ public class ZooKeeperTokenStore impleme
   @Override
   public void close() throws IOException {
     if (this.zkSession != null) {
-      try {
-        this.zkSession.close();
-      } catch (InterruptedException ex) {
-        LOGGER.warn("Failed to close existing session.", ex);
-      }
+      this.zkSession.close();
     }
   }
 
   @Override
-  public void setStore(Object hmsHandler) throws TokenStoreException {
-    // no-op.
+  public void init(Object objectStore, ServerMode smode) {
+    this.serverMode = smode;
+    zkConnectString = conf.get(
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+    if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+      // try alternate config param
+      zkConnectString = conf.get(
+          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null);
+      if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+        throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
+            + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
+            + " or "
+            + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
+            + WHEN_ZK_DSTORE_MSG);
+      }
+    }
+    connectTimeoutMillis = conf.getInt(
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
+    String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+    if (StringUtils.isNotBlank(aclStr)) {
+      this.newNodeAcl = parseACLs(aclStr);
+    }
+    rootNode = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+        HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
+
+    try {
+      // Install the JAAS Configuration for the runtime
+      setupJAASConfig(conf);
+    } catch (IOException e) {
+      throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client "
+          + e.getMessage(), e);
+    }
+    initClientAndPaths();
   }
 
 }

Modified: hive/branches/spark/shims/common/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common/pom.xml (original)
+++ hive/branches/spark/shims/common/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>0.14.0-SNAPSHOT</version>
+    <version>0.15.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
 

Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Oct 30 16:22:33 2014
@@ -239,6 +239,15 @@ public interface HadoopShims {
   public String getTokenStrForm(String tokenSignature) throws IOException;
 
   /**
+   * Dynamically sets up the JAAS configuration that uses kerberos
+   * @param principal
+   * @param keyTabFile
+   * @throws IOException
+   */
+  public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile)
+      throws IOException;
+
+  /**
    * Add a delegation token to the given ugi
    * @param ugi
    * @param tokenStr
@@ -322,6 +331,14 @@ public interface HadoopShims {
       String keytabFile) throws IOException;
 
   /**
+   * Convert Kerberos principal name pattern to valid Kerberos principal names.
+   * @param principal (principal name pattern)
+   * @return
+   * @throws IOException
+   */
+  public String getResolvedPrincipal(String principal) throws IOException;
+
+  /**
    * Perform kerberos re-login using the given principal and keytab, to renew
    * the credentials
    * @throws IOException
@@ -366,6 +383,15 @@ public interface HadoopShims {
   public short getDefaultReplication(FileSystem fs, Path path);
 
   /**
+   * Reset the default fair scheduler queue mapping to end user.
+   *
+   * @param conf
+   * @param userName end user name
+   */
+  public void refreshDefaultQueue(Configuration conf, String userName)
+      throws IOException;
+
+  /**
    * Create the proxy ugi for the given userid
    * @param userName
    * @return
@@ -731,4 +757,21 @@ public interface HadoopShims {
    * @return Path to HDFS trash, if current hadoop supports trash feature.  Null otherwise.
    */
   Path getCurrentTrashPath(Configuration conf, FileSystem fs);
+
+  /**
+   * Returns a shim to wrap KerberosName
+   */
+  public KerberosNameShim getKerberosNameShim(String name) throws IOException;
+
+  /**
+   * Shim for KerberosName
+   */
+  public interface KerberosNameShim {
+    public String getDefaultRealm();
+    public String getServiceName();
+    public String getHostName();
+    public String getRealm();
+    public String getShortName() throws IOException;
+  }
+
 }

Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Thu Oct 30 16:22:33 2014
@@ -99,12 +99,15 @@ public class HadoopThriftAuthBridge {
   }
 
   public static abstract class Server {
+    public enum ServerMode {
+      HIVESERVER2, METASTORE
+    };
     public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
     public abstract TProcessor wrapProcessor(TProcessor processor);
     public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
     public abstract InetAddress getRemoteAddress();
     public abstract void startDelegationTokenSecretManager(Configuration conf,
-        Object hmsHandler) throws IOException;
+        Object hmsHandler, ServerMode smode) throws IOException;
     public abstract String getDelegationToken(String owner, String renewer)
         throws IOException, InterruptedException;
     public abstract String getDelegationTokenWithService(String owner, String renewer, String service)

Modified: hive/branches/spark/shims/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/pom.xml (original)
+++ hive/branches/spark/shims/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>0.14.0-SNAPSHOT</version>
+    <version>0.15.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

Modified: hive/branches/spark/testutils/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/pom.xml (original)
+++ hive/branches/spark/testutils/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>0.14.0-SNAPSHOT</version>
+    <version>0.15.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

Modified: hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java (original)
+++ hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java Thu Oct 30 16:22:33 2014
@@ -87,7 +87,7 @@ class JIRAService {
       List<String> messages) {
     DefaultHttpClient httpClient = new DefaultHttpClient();
     try {
-      String buildTag = formatBuildTag(mBuildTag);
+      BuildInfo buildInfo = formatBuildTag(mBuildTag);
       String buildTagForLogs = formatBuildTagForLogs(mBuildTag);
       List<String> comments = Lists.newArrayList();
       comments.add("");
@@ -120,8 +120,10 @@ class JIRAService {
         }
         comments.add("");
       }
-      comments.add("Test results: " + mJenkinsURL + "/" + buildTag + "/testReport");
-      comments.add("Console output: " + mJenkinsURL + "/" + buildTag + "/console");
+      comments.add("Test results: " + mJenkinsURL + "/" +
+          buildInfo.getFormattedBuildTag() + "/testReport");
+      comments.add("Console output: " + mJenkinsURL + "/" +
+          buildInfo.getFormattedBuildTag() + "/console");
       comments.add("Test logs: " + mLogsURL + buildTagForLogs);
       comments.add("");
       if(!messages.isEmpty()) {
@@ -133,10 +135,9 @@ class JIRAService {
       }
       comments.add("This message is automatically generated.");
       String attachmentId = parseAttachementId(mPatch);
-      if(!attachmentId.isEmpty()) {
-        comments.add("");
-        comments.add("ATTACHMENT ID: " + attachmentId);
-      }
+      comments.add("");
+      comments.add("ATTACHMENT ID: " + attachmentId +
+          " - " + buildInfo.getBuildName());
       mLogger.info("Comment: " + Joiner.on("\n").join(comments));
       String body = Joiner.on("\n").join(comments);
       String url = String.format("%s/rest/api/2/issue/%s/comment", mUrl, mName);
@@ -193,16 +194,36 @@ class JIRAService {
     }
   }
 
+
+  public static class BuildInfo {
+    private String buildName;
+    private String formattedBuildTag;
+
+    public BuildInfo (String buildName, String formattedBuildTag) {
+      this.buildName = buildName;
+      this.formattedBuildTag = formattedBuildTag;
+    }
+
+    public String getBuildName() {
+      return buildName;
+    }
+
+    public String getFormattedBuildTag() {
+      return formattedBuildTag;
+    }
+  }
+
   /**
    * Hive-Build-123 to Hive-Build/123
    */
   @VisibleForTesting
-  static String formatBuildTag(String buildTag) {
+  static BuildInfo formatBuildTag(String buildTag) {
     if(buildTag.contains("-")) {
       int lastDashIndex = buildTag.lastIndexOf("-");
       String buildName = buildTag.substring(0, lastDashIndex);
       String buildId = buildTag.substring(lastDashIndex + 1);
-      return buildName + "/" + buildId;
+      String formattedBuildTag = buildName + "/" + buildId;
+      return new BuildInfo(buildName, formattedBuildTag);
     }
     throw new IllegalArgumentException("Build tag '" + buildTag + "' must contain a -");
   }

Modified: hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java (original)
+++ hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java Thu Oct 30 16:22:33 2014
@@ -287,77 +287,77 @@ public class PTest {
     String buildTag = System.getenv("BUILD_TAG") == null ? "undefined-"
         + System.currentTimeMillis() : System.getenv("BUILD_TAG");
         File logDir = Dirs.create(new File(executionContextConfiguration.getGlobalLogDirectory(), buildTag));
-        LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration.
-            getGlobalLogDirectory()), 5);
-        cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory());
-        cleaner.setDaemon(true);
-        cleaner.start();
-        TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG);
-        String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim();
-        if(!repository.isEmpty()) {
-          conf.setRepository(repository);
-        }
-        String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim();
-        if(!repositoryName.isEmpty()) {
-          conf.setRepositoryName(repositoryName);
-        }
-        String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim();
-        if(!branch.isEmpty()) {
-          conf.setBranch(branch);
-        }
-        String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim();
-        if(!patch.isEmpty()) {
-          conf.setPatch(patch);
-        }
-        String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim();
-        if(!javaHome.isEmpty()) {
-          conf.setJavaHome(javaHome);
-        }
-        String javaHomeForTests = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME_TEST)).trim();
-        if(!javaHomeForTests.isEmpty()) {
-          conf.setJavaHomeForTests(javaHomeForTests);
-        }
-        String antTestArgs = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_ARGS)).trim();
-        if(!antTestArgs.isEmpty()) {
-          conf.setAntTestArgs(antTestArgs);
-        }
-        String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim();
-        if(!antEnvOpts.isEmpty()) {
-          conf.setAntEnvOpts(antEnvOpts);
-        }
-        String antTestTarget = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_TARGET)).trim();
-        if(!antTestTarget.isEmpty()) {
-          conf.setAntTestTarget(antTestTarget);
-        }
-        String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG);
-        if(supplementalAntArgs != null && supplementalAntArgs.length > 0) {
-          String antArgs = Strings.nullToEmpty(conf.getAntArgs());
-          if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) {
-            antArgs += " ";
-          }
-          antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs);
-          conf.setAntArgs(antArgs);
-        }
-        ExecutionContextProvider executionContextProvider = null;
-        ExecutionContext executionContext = null;
-        int exitCode = 0;
-        try {
-          executionContextProvider = executionContextConfiguration
-              .getExecutionContextProvider();
-          executionContext = executionContextProvider.createExecutionContext();
-          LocalCommandFactory localCommandFactory = new LocalCommandFactory(LOG);
-          PTest ptest = new PTest(conf, executionContext, buildTag, logDir,
-              localCommandFactory, new SSHCommandExecutor(LOG, localCommandFactory, conf.getSshOpts()),
-              new RSyncCommandExecutor(LOG, 10, localCommandFactory), LOG);
-          exitCode = ptest.run();
-        } finally {
-          if(executionContext != null) {
-            executionContext.terminate();
-          }
-          if(executionContextProvider != null) {
-            executionContextProvider.close();
-          }
-        }
-        System.exit(exitCode);
+    LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration.
+        getGlobalLogDirectory()), 5);
+    cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory());
+    cleaner.setDaemon(true);
+    cleaner.start();
+    TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG);
+    String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim();
+    if(!repository.isEmpty()) {
+      conf.setRepository(repository);
+    }
+    String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim();
+    if(!repositoryName.isEmpty()) {
+      conf.setRepositoryName(repositoryName);
+    }
+    String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim();
+    if(!branch.isEmpty()) {
+      conf.setBranch(branch);
+    }
+    String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim();
+    if(!patch.isEmpty()) {
+      conf.setPatch(patch);
+    }
+    String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim();
+    if(!javaHome.isEmpty()) {
+      conf.setJavaHome(javaHome);
+    }
+    String javaHomeForTests = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME_TEST)).trim();
+    if(!javaHomeForTests.isEmpty()) {
+      conf.setJavaHomeForTests(javaHomeForTests);
+    }
+    String antTestArgs = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_ARGS)).trim();
+    if(!antTestArgs.isEmpty()) {
+      conf.setAntTestArgs(antTestArgs);
+    }
+    String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim();
+    if(!antEnvOpts.isEmpty()) {
+      conf.setAntEnvOpts(antEnvOpts);
+    }
+    String antTestTarget = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_TARGET)).trim();
+    if(!antTestTarget.isEmpty()) {
+      conf.setAntTestTarget(antTestTarget);
+    }
+    String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG);
+    if(supplementalAntArgs != null && supplementalAntArgs.length > 0) {
+      String antArgs = Strings.nullToEmpty(conf.getAntArgs());
+      if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) {
+        antArgs += " ";
+      }
+      antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs);
+      conf.setAntArgs(antArgs);
+    }
+    ExecutionContextProvider executionContextProvider = null;
+    ExecutionContext executionContext = null;
+    int exitCode = 0;
+    try {
+      executionContextProvider = executionContextConfiguration
+          .getExecutionContextProvider();
+      executionContext = executionContextProvider.createExecutionContext();
+      LocalCommandFactory localCommandFactory = new LocalCommandFactory(LOG);
+      PTest ptest = new PTest(conf, executionContext, buildTag, logDir,
+          localCommandFactory, new SSHCommandExecutor(LOG, localCommandFactory, conf.getSshOpts()),
+          new RSyncCommandExecutor(LOG, 10, localCommandFactory), LOG);
+      exitCode = ptest.run();
+    } finally {
+      if(executionContext != null) {
+        executionContext.terminate();
+      }
+      if(executionContextProvider != null) {
+        executionContextProvider.close();
+      }
+    }
+    System.exit(exitCode);
   }
 }

Modified: hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java (original)
+++ hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java Thu Oct 30 16:22:33 2014
@@ -46,7 +46,7 @@ public class ExecutionContext {
   public void clearBadHosts() {
     mBadHosts.clear();
   }
-  void addHost(Host host) {
+  public void addHost(Host host) {
     mHosts.add(host);
   }
   boolean removeHost(Host host) {

Modified: hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm (original)
+++ hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm Thu Oct 30 16:22:33 2014
@@ -62,7 +62,7 @@ then
     testModule=./
   fi
   pushd $testModule
-  timeout 2h mvn -B -o test -Dmaven.repo.local=$localDir/$instanceName/maven \
+  timeout 2h mvn -B test -Dmaven.repo.local=$localDir/$instanceName/maven \
     $mavenArgs $mavenTestArgs $testArguments 1>$logDir/maven-test.txt 2>&1 </dev/null &
 #[[
   pid=$!

Modified: hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java (original)
+++ hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java Thu Oct 30 16:22:33 2014
@@ -18,17 +18,24 @@
  */
 package org.apache.hive.ptest.execution;
 
+import com.google.common.collect.Lists;
+
 import java.util.List;
 import junit.framework.Assert;
+
+import org.apache.hive.ptest.execution.JIRAService.BuildInfo;
 import org.junit.Test;
-import com.google.common.collect.Lists;
 
 public class TestJIRAService  {
 
   @Test
   public void testFormatBuildTagPositive() throws Throwable {
-    Assert.assertEquals("abc/123", JIRAService.formatBuildTag("abc-123"));
-    Assert.assertEquals("a-b-c/123", JIRAService.formatBuildTag("a-b-c-123"));
+    BuildInfo buildInfo = JIRAService.formatBuildTag("abc-123");
+    Assert.assertEquals("abc/123", buildInfo.getFormattedBuildTag());
+    Assert.assertEquals("abc", buildInfo.getBuildName());
+    buildInfo = JIRAService.formatBuildTag("PreCommit-HIVE-TRUNK-Build-1115");
+    Assert.assertEquals("PreCommit-HIVE-TRUNK-Build/1115", buildInfo.getFormattedBuildTag());
+    Assert.assertEquals("PreCommit-HIVE-TRUNK-Build", buildInfo.getBuildName());
   }
   @Test(expected=IllegalArgumentException.class)
   public void testFormatBuildTagNoDashSlash() throws Throwable {

Modified: hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt (original)
+++ hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt Thu Oct 30 16:22:33 2014
@@ -61,7 +61,7 @@ then
     testModule=./
   fi
   pushd $testModule
-  timeout 2h mvn -B -o test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
+  timeout 2h mvn -B test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
     $mavenArgs $mavenTestArgs -Dtest=arg1 1>/some/log/dir/maven-test.txt 2>&1 </dev/null &
 
   pid=$!

Modified: hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt (original)
+++ hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt Thu Oct 30 16:22:33 2014
@@ -61,7 +61,7 @@ then
     testModule=./
   fi
   pushd $testModule
-  timeout 2h mvn -B -o test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
+  timeout 2h mvn -B test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
     -Dant=arg1 $mavenTestArgs -Dtest=arg1 1>/some/log/dir/maven-test.txt 2>&1 </dev/null &
 
   pid=$!