You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [28/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hado...
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Thu Oct 30 16:22:33 2014
@@ -20,24 +20,28 @@ package org.apache.hadoop.hive.thrift;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
@@ -56,26 +60,35 @@ public class ZooKeeperTokenStore impleme
private static final String NODE_TOKENS = "/tokens";
private String rootNode = "";
- private volatile ZooKeeper zkSession;
+ private volatile CuratorFramework zkSession;
private String zkConnectString;
private final int zkSessionTimeout = 3000;
- private long connectTimeoutMillis = -1;
- private List<ACL> newNodeAcl = Ids.OPEN_ACL_UNSAFE;
+ private int connectTimeoutMillis = -1;
+ private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS));
- private class ZooKeeperWatcher implements Watcher {
- public void process(org.apache.zookeeper.WatchedEvent event) {
- LOGGER.info(event.toString());
- if (event.getState() == Watcher.Event.KeeperState.Expired) {
- LOGGER.warn("ZooKeeper session expired, discarding connection");
- try {
- zkSession.close();
- } catch (Throwable e) {
- LOGGER.warn("Failed to close connection on expired session", e);
- }
- }
+ /**
+ * ACLProvider permissions will be used in case parent dirs need to be created
+ */
+ private final ACLProvider aclDefaultProvider = new ACLProvider() {
+
+ @Override
+ public List<ACL> getDefaultAcl() {
+ return newNodeAcl;
}
- }
+ @Override
+ public List<ACL> getAclForPath(String path) {
+ return getDefaultAcl();
+ }
+ };
+
+
+ private ServerMode serverMode;
+
+ private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled"
+ + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")";
+
+ private Configuration conf;
/**
* Default constructor for dynamic instantiation w/ Configurable
@@ -84,93 +97,74 @@ public class ZooKeeperTokenStore impleme
protected ZooKeeperTokenStore() {
}
- public ZooKeeperTokenStore(String hostPort) {
- this.zkConnectString = hostPort;
- init();
- }
-
- private ZooKeeper getSession() {
- if (zkSession == null || zkSession.getState() == States.CLOSED) {
- synchronized (this) {
- if (zkSession == null || zkSession.getState() == States.CLOSED) {
- try {
- zkSession = createConnectedClient(this.zkConnectString, this.zkSessionTimeout,
- this.connectTimeoutMillis, new ZooKeeperWatcher());
- } catch (IOException ex) {
- throw new TokenStoreException("Token store error.", ex);
- }
- }
+ private CuratorFramework getSession() {
+ if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+ synchronized (this) {
+ if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
+ zkSession = CuratorFrameworkFactory.builder().connectString(zkConnectString)
+ .sessionTimeoutMs(zkSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
+ .aclProvider(aclDefaultProvider)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ zkSession.start();
}
+ }
}
return zkSession;
}
- /**
- * Create a ZooKeeper session that is in connected state.
- *
- * @param connectString ZooKeeper connect String
- * @param sessionTimeout ZooKeeper session timeout
- * @param connectTimeout milliseconds to wait for connection, 0 or negative value means no wait
- * @param watchers
- * @return
- * @throws InterruptedException
- * @throws IOException
- */
- public static ZooKeeper createConnectedClient(String connectString,
- int sessionTimeout, long connectTimeout, final Watcher... watchers)
- throws IOException {
- final CountDownLatch connected = new CountDownLatch(1);
- Watcher connectWatcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getState()) {
- case SyncConnected:
- connected.countDown();
- break;
- }
- for (Watcher w : watchers) {
- w.process(event);
- }
- }
- };
- ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, connectWatcher);
- if (connectTimeout > 0) {
- try {
- if (!connected.await(connectTimeout, TimeUnit.MILLISECONDS)) {
- zk.close();
- throw new IOException("Timeout waiting for connection after "
- + connectTimeout + "ms");
- }
- } catch (InterruptedException e) {
- throw new IOException("Error waiting for connection.", e);
- }
+ private void setupJAASConfig(Configuration conf) throws IOException {
+ if (!UserGroupInformation.getLoginUser().isFromKeytab()) {
+ // The process has not logged in using keytab
+ // this should be a test mode, can't use keytab to authenticate
+ // with zookeeper.
+ LOGGER.warn("Login is not from keytab");
+ return;
+ }
+
+ String principal;
+ String keytab;
+ switch (serverMode) {
+ case METASTORE:
+ principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal");
+ keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file");
+ break;
+ case HIVESERVER2:
+ principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal");
+ keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab");
+ break;
+ default:
+ throw new AssertionError("Unexpected server mode " + serverMode);
+ }
+ ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keytab);
+ }
+
+ private String getNonEmptyConfVar(Configuration conf, String param) throws IOException {
+ String val = conf.get(param);
+ if (val == null || val.trim().isEmpty()) {
+ throw new IOException("Configuration parameter " + param + " should be set, "
+ + WHEN_ZK_DSTORE_MSG);
}
- return zk;
+ return val;
}
/**
* Create a path if it does not already exist ("mkdir -p")
- * @param zk ZooKeeper session
* @param path string with '/' separator
* @param acl list of ACL entries
- * @return
- * @throws KeeperException
- * @throws InterruptedException
+ * @throws TokenStoreException
*/
- public static String ensurePath(ZooKeeper zk, String path, List<ACL> acl) throws KeeperException,
- InterruptedException {
- String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
- String currentPath = "";
- for (String pathComp : pathComps) {
- currentPath += "/" + pathComp;
- try {
- String node = zk.create(currentPath, new byte[0], acl,
- CreateMode.PERSISTENT);
- LOGGER.info("Created path: " + node);
- } catch (KeeperException.NodeExistsException e) {
- }
+ public void ensurePath(String path, List<ACL> acl)
+ throws TokenStoreException {
+ try {
+ CuratorFramework zk = getSession();
+ String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .withACL(acl).forPath(path);
+ LOGGER.info("Created path: {} ", node);
+ } catch (KeeperException.NodeExistsException e) {
+ // node already exists
+ } catch (Exception e) {
+ throw new TokenStoreException("Error creating path " + path, e);
}
- return currentPath;
}
/**
@@ -234,45 +228,24 @@ public class ZooKeeperTokenStore impleme
return acl;
}
- private void init() {
- if (this.zkConnectString == null) {
- throw new IllegalStateException("Not initialized");
- }
-
+ private void initClientAndPaths() {
if (this.zkSession != null) {
- try {
- this.zkSession.close();
- } catch (InterruptedException ex) {
- LOGGER.warn("Failed to close existing session.", ex);
- }
+ this.zkSession.close();
}
- ZooKeeper zk = getSession();
-
try {
- ensurePath(zk, rootNode + NODE_KEYS, newNodeAcl);
- ensurePath(zk, rootNode + NODE_TOKENS, newNodeAcl);
- } catch (Exception e) {
- throw new TokenStoreException("Failed to validate token path.", e);
- }
+ ensurePath(rootNode + NODE_KEYS, newNodeAcl);
+ ensurePath(rootNode + NODE_TOKENS, newNodeAcl);
+ } catch (TokenStoreException e) {
+ throw e;
+ }
}
@Override
public void setConf(Configuration conf) {
if (conf == null) {
- throw new IllegalArgumentException("conf is null");
+ throw new IllegalArgumentException("conf is null");
}
- this.zkConnectString = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
- this.connectTimeoutMillis = conf.getLong(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
- this.rootNode = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT);
- String csv = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
- if (StringUtils.isNotBlank(csv)) {
- this.newNodeAcl = parseACLs(csv);
- }
- init();
+ this.conf = conf;
}
@Override
@@ -280,15 +253,18 @@ public class ZooKeeperTokenStore impleme
return null; // not required
}
- private Map<Integer, byte[]> getAllKeys() throws KeeperException,
- InterruptedException {
+ private Map<Integer, byte[]> getAllKeys() throws KeeperException, InterruptedException {
String masterKeyNode = rootNode + NODE_KEYS;
- ZooKeeper zk = getSession();
- List<String> nodes = zk.getChildren(masterKeyNode, false);
+
+ // get children of key node
+ List<String> nodes = zkGetChildren(masterKeyNode);
+
+ // read each child node, add to results
Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
for (String node : nodes) {
- byte[] data = zk.getData(masterKeyNode + "/" + node, false, null);
+ String nodePath = masterKeyNode + "/" + node;
+ byte[] data = zkGetData(nodePath);
if (data != null) {
result.put(getSeq(node), data);
}
@@ -296,6 +272,26 @@ public class ZooKeeperTokenStore impleme
return result;
}
+ private List<String> zkGetChildren(String path) {
+ CuratorFramework zk = getSession();
+ try {
+ return zk.getChildren().forPath(path);
+ } catch (Exception e) {
+ throw new TokenStoreException("Error getting children for " + path, e);
+ }
+ }
+
+ private byte[] zkGetData(String nodePath) {
+ CuratorFramework zk = getSession();
+ try {
+ return zk.getData().forPath(nodePath);
+ } catch (KeeperException.NoNodeException ex) {
+ return null;
+ } catch (Exception e) {
+ throw new TokenStoreException("Error reading " + nodePath, e);
+ }
+ }
+
private int getSeq(String path) {
String[] pathComps = path.split("/");
return Integer.parseInt(pathComps[pathComps.length-1]);
@@ -303,44 +299,45 @@ public class ZooKeeperTokenStore impleme
@Override
public int addMasterKey(String s) {
+ String keysPath = rootNode + NODE_KEYS + "/";
+ CuratorFramework zk = getSession();
+ String newNode;
try {
- ZooKeeper zk = getSession();
- String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), newNodeAcl,
- CreateMode.PERSISTENT_SEQUENTIAL);
- LOGGER.info("Added key {}", newNode);
- return getSeq(newNode);
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl)
+ .forPath(keysPath, s.getBytes());
+ } catch (Exception e) {
+ throw new TokenStoreException("Error creating new node with path " + keysPath, e);
}
+ LOGGER.info("Added key {}", newNode);
+ return getSeq(newNode);
}
@Override
public void updateMasterKey(int keySeq, String s) {
+ CuratorFramework zk = getSession();
+ String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
try {
- ZooKeeper zk = getSession();
- zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
- -1);
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ zk.setData().forPath(keyPath, s.getBytes());
+ } catch (Exception e) {
+ throw new TokenStoreException("Error setting data in " + keyPath, e);
}
}
@Override
public boolean removeMasterKey(int keySeq) {
+ String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq);
+ zkDelete(keyPath);
+ return true;
+ }
+
+ private void zkDelete(String path) {
+ CuratorFramework zk = getSession();
try {
- ZooKeeper zk = getSession();
- zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1);
- return true;
+ zk.delete().forPath(path);
} catch (KeeperException.NoNodeException ex) {
- return false;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ // already deleted
+ } catch (Exception e) {
+ throw new TokenStoreException("Error deleting " + path, e);
}
}
@@ -374,67 +371,42 @@ public class ZooKeeperTokenStore impleme
@Override
public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
DelegationTokenInformation token) {
+ byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
+ String tokenPath = getTokenPath(tokenIdentifier);
+ CuratorFramework zk = getSession();
+ String newNode;
try {
- ZooKeeper zk = getSession();
- byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
- String newNode = zk.create(getTokenPath(tokenIdentifier),
- tokenBytes, newNodeAcl, CreateMode.PERSISTENT);
- LOGGER.info("Added token: {}", newNode);
- return true;
- } catch (KeeperException.NodeExistsException ex) {
- return false;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl)
+ .forPath(tokenPath, tokenBytes);
+ } catch (Exception e) {
+ throw new TokenStoreException("Error creating new node with path " + tokenPath, e);
}
+
+ LOGGER.info("Added token: {}", newNode);
+ return true;
}
@Override
public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
- try {
- ZooKeeper zk = getSession();
- zk.delete(getTokenPath(tokenIdentifier), -1);
- return true;
- } catch (KeeperException.NoNodeException ex) {
- return false;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
- }
+ String tokenPath = getTokenPath(tokenIdentifier);
+ zkDelete(tokenPath);
+ return true;
}
@Override
public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier));
try {
- ZooKeeper zk = getSession();
- byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null);
- try {
- return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
- } catch (Exception ex) {
- throw new TokenStoreException("Failed to decode token", ex);
- }
- } catch (KeeperException.NoNodeException ex) {
- return null;
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
+ return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+ } catch (Exception ex) {
+ throw new TokenStoreException("Failed to decode token", ex);
}
}
@Override
public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
String containerNode = rootNode + NODE_TOKENS;
- final List<String> nodes;
- try {
- nodes = getSession().getChildren(containerNode, false);
- } catch (KeeperException ex) {
- throw new TokenStoreException(ex);
- } catch (InterruptedException ex) {
- throw new TokenStoreException(ex);
- }
+ final List<String> nodes = zkGetChildren(containerNode);
List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
nodes.size());
for (String node : nodes) {
@@ -452,17 +424,44 @@ public class ZooKeeperTokenStore impleme
@Override
public void close() throws IOException {
if (this.zkSession != null) {
- try {
- this.zkSession.close();
- } catch (InterruptedException ex) {
- LOGGER.warn("Failed to close existing session.", ex);
- }
+ this.zkSession.close();
}
}
@Override
- public void setStore(Object hmsHandler) throws TokenStoreException {
- // no-op.
+ public void init(Object objectStore, ServerMode smode) {
+ this.serverMode = smode;
+ zkConnectString = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+ if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+ // try alternate config param
+ zkConnectString = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null);
+ if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+ throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
+ + "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
+ + " or "
+ + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
+ + WHEN_ZK_DSTORE_MSG);
+ }
+ }
+ connectTimeoutMillis = conf.getInt(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
+ String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+ if (StringUtils.isNotBlank(aclStr)) {
+ this.newNodeAcl = parseACLs(aclStr);
+ }
+ rootNode = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
+
+ try {
+ // Install the JAAS Configuration for the runtime
+ setupJAASConfig(conf);
+ } catch (IOException e) {
+ throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client "
+ + e.getMessage(), e);
+ }
+ initClientAndPaths();
}
}
Modified: hive/branches/spark/shims/common/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common/pom.xml (original)
+++ hive/branches/spark/shims/common/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Oct 30 16:22:33 2014
@@ -239,6 +239,15 @@ public interface HadoopShims {
public String getTokenStrForm(String tokenSignature) throws IOException;
/**
+ * Dynamically sets up the JAAS configuration that uses kerberos
+ * @param principal
+ * @param keyTabFile
+ * @throws IOException
+ */
+ public void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile)
+ throws IOException;
+
+ /**
* Add a delegation token to the given ugi
* @param ugi
* @param tokenStr
@@ -322,6 +331,14 @@ public interface HadoopShims {
String keytabFile) throws IOException;
/**
+ * Convert Kerberos principal name pattern to valid Kerberos principal names.
+ * @param principal (principal name pattern)
+ * @return
+ * @throws IOException
+ */
+ public String getResolvedPrincipal(String principal) throws IOException;
+
+ /**
* Perform kerberos re-login using the given principal and keytab, to renew
* the credentials
* @throws IOException
@@ -366,6 +383,15 @@ public interface HadoopShims {
public short getDefaultReplication(FileSystem fs, Path path);
/**
+ * Reset the default fair scheduler queue mapping to end user.
+ *
+ * @param conf
+ * @param userName end user name
+ */
+ public void refreshDefaultQueue(Configuration conf, String userName)
+ throws IOException;
+
+ /**
* Create the proxy ugi for the given userid
* @param userName
* @return
@@ -731,4 +757,21 @@ public interface HadoopShims {
* @return Path to HDFS trash, if current hadoop supports trash feature. Null otherwise.
*/
Path getCurrentTrashPath(Configuration conf, FileSystem fs);
+
+ /**
+ * Returns a shim to wrap KerberosName
+ */
+ public KerberosNameShim getKerberosNameShim(String name) throws IOException;
+
+ /**
+ * Shim for KerberosName
+ */
+ public interface KerberosNameShim {
+ public String getDefaultRealm();
+ public String getServiceName();
+ public String getHostName();
+ public String getRealm();
+ public String getShortName() throws IOException;
+ }
+
}
Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Thu Oct 30 16:22:33 2014
@@ -99,12 +99,15 @@ public class HadoopThriftAuthBridge {
}
public static abstract class Server {
+ public enum ServerMode {
+ HIVESERVER2, METASTORE
+ };
public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
public abstract TProcessor wrapProcessor(TProcessor processor);
public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
public abstract InetAddress getRemoteAddress();
public abstract void startDelegationTokenSecretManager(Configuration conf,
- Object hmsHandler) throws IOException;
+ Object hmsHandler, ServerMode smode) throws IOException;
public abstract String getDelegationToken(String owner, String renewer)
throws IOException, InterruptedException;
public abstract String getDelegationTokenWithService(String owner, String renewer, String service)
Modified: hive/branches/spark/shims/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/shims/pom.xml (original)
+++ hive/branches/spark/shims/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/testutils/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/pom.xml?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/pom.xml (original)
+++ hive/branches/spark/testutils/pom.xml Thu Oct 30 16:22:33 2014
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>0.14.0-SNAPSHOT</version>
+ <version>0.15.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
Modified: hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java (original)
+++ hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/JIRAService.java Thu Oct 30 16:22:33 2014
@@ -87,7 +87,7 @@ class JIRAService {
List<String> messages) {
DefaultHttpClient httpClient = new DefaultHttpClient();
try {
- String buildTag = formatBuildTag(mBuildTag);
+ BuildInfo buildInfo = formatBuildTag(mBuildTag);
String buildTagForLogs = formatBuildTagForLogs(mBuildTag);
List<String> comments = Lists.newArrayList();
comments.add("");
@@ -120,8 +120,10 @@ class JIRAService {
}
comments.add("");
}
- comments.add("Test results: " + mJenkinsURL + "/" + buildTag + "/testReport");
- comments.add("Console output: " + mJenkinsURL + "/" + buildTag + "/console");
+ comments.add("Test results: " + mJenkinsURL + "/" +
+ buildInfo.getFormattedBuildTag() + "/testReport");
+ comments.add("Console output: " + mJenkinsURL + "/" +
+ buildInfo.getFormattedBuildTag() + "/console");
comments.add("Test logs: " + mLogsURL + buildTagForLogs);
comments.add("");
if(!messages.isEmpty()) {
@@ -133,10 +135,9 @@ class JIRAService {
}
comments.add("This message is automatically generated.");
String attachmentId = parseAttachementId(mPatch);
- if(!attachmentId.isEmpty()) {
- comments.add("");
- comments.add("ATTACHMENT ID: " + attachmentId);
- }
+ comments.add("");
+ comments.add("ATTACHMENT ID: " + attachmentId +
+ " - " + buildInfo.getBuildName());
mLogger.info("Comment: " + Joiner.on("\n").join(comments));
String body = Joiner.on("\n").join(comments);
String url = String.format("%s/rest/api/2/issue/%s/comment", mUrl, mName);
@@ -193,16 +194,36 @@ class JIRAService {
}
}
+
+ public static class BuildInfo {
+ private String buildName;
+ private String formattedBuildTag;
+
+ public BuildInfo (String buildName, String formattedBuildTag) {
+ this.buildName = buildName;
+ this.formattedBuildTag = formattedBuildTag;
+ }
+
+ public String getBuildName() {
+ return buildName;
+ }
+
+ public String getFormattedBuildTag() {
+ return formattedBuildTag;
+ }
+ }
+
/**
* Hive-Build-123 to Hive-Build/123
*/
@VisibleForTesting
- static String formatBuildTag(String buildTag) {
+ static BuildInfo formatBuildTag(String buildTag) {
if(buildTag.contains("-")) {
int lastDashIndex = buildTag.lastIndexOf("-");
String buildName = buildTag.substring(0, lastDashIndex);
String buildId = buildTag.substring(lastDashIndex + 1);
- return buildName + "/" + buildId;
+ String formattedBuildTag = buildName + "/" + buildId;
+ return new BuildInfo(buildName, formattedBuildTag);
}
throw new IllegalArgumentException("Build tag '" + buildTag + "' must contain a -");
}
Modified: hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java (original)
+++ hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PTest.java Thu Oct 30 16:22:33 2014
@@ -287,77 +287,77 @@ public class PTest {
String buildTag = System.getenv("BUILD_TAG") == null ? "undefined-"
+ System.currentTimeMillis() : System.getenv("BUILD_TAG");
File logDir = Dirs.create(new File(executionContextConfiguration.getGlobalLogDirectory(), buildTag));
- LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration.
- getGlobalLogDirectory()), 5);
- cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory());
- cleaner.setDaemon(true);
- cleaner.start();
- TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG);
- String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim();
- if(!repository.isEmpty()) {
- conf.setRepository(repository);
- }
- String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim();
- if(!repositoryName.isEmpty()) {
- conf.setRepositoryName(repositoryName);
- }
- String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim();
- if(!branch.isEmpty()) {
- conf.setBranch(branch);
- }
- String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim();
- if(!patch.isEmpty()) {
- conf.setPatch(patch);
- }
- String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim();
- if(!javaHome.isEmpty()) {
- conf.setJavaHome(javaHome);
- }
- String javaHomeForTests = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME_TEST)).trim();
- if(!javaHomeForTests.isEmpty()) {
- conf.setJavaHomeForTests(javaHomeForTests);
- }
- String antTestArgs = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_ARGS)).trim();
- if(!antTestArgs.isEmpty()) {
- conf.setAntTestArgs(antTestArgs);
- }
- String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim();
- if(!antEnvOpts.isEmpty()) {
- conf.setAntEnvOpts(antEnvOpts);
- }
- String antTestTarget = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_TARGET)).trim();
- if(!antTestTarget.isEmpty()) {
- conf.setAntTestTarget(antTestTarget);
- }
- String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG);
- if(supplementalAntArgs != null && supplementalAntArgs.length > 0) {
- String antArgs = Strings.nullToEmpty(conf.getAntArgs());
- if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) {
- antArgs += " ";
- }
- antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs);
- conf.setAntArgs(antArgs);
- }
- ExecutionContextProvider executionContextProvider = null;
- ExecutionContext executionContext = null;
- int exitCode = 0;
- try {
- executionContextProvider = executionContextConfiguration
- .getExecutionContextProvider();
- executionContext = executionContextProvider.createExecutionContext();
- LocalCommandFactory localCommandFactory = new LocalCommandFactory(LOG);
- PTest ptest = new PTest(conf, executionContext, buildTag, logDir,
- localCommandFactory, new SSHCommandExecutor(LOG, localCommandFactory, conf.getSshOpts()),
- new RSyncCommandExecutor(LOG, 10, localCommandFactory), LOG);
- exitCode = ptest.run();
- } finally {
- if(executionContext != null) {
- executionContext.terminate();
- }
- if(executionContextProvider != null) {
- executionContextProvider.close();
- }
- }
- System.exit(exitCode);
+ LogDirectoryCleaner cleaner = new LogDirectoryCleaner(new File(executionContextConfiguration.
+ getGlobalLogDirectory()), 5);
+ cleaner.setName("LogCleaner-" + executionContextConfiguration.getGlobalLogDirectory());
+ cleaner.setDaemon(true);
+ cleaner.start();
+ TestConfiguration conf = TestConfiguration.fromFile(testConfigurationFile, LOG);
+ String repository = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY)).trim();
+ if(!repository.isEmpty()) {
+ conf.setRepository(repository);
+ }
+ String repositoryName = Strings.nullToEmpty(commandLine.getOptionValue(REPOSITORY_NAME)).trim();
+ if(!repositoryName.isEmpty()) {
+ conf.setRepositoryName(repositoryName);
+ }
+ String branch = Strings.nullToEmpty(commandLine.getOptionValue(BRANCH)).trim();
+ if(!branch.isEmpty()) {
+ conf.setBranch(branch);
+ }
+ String patch = Strings.nullToEmpty(commandLine.getOptionValue(PATCH)).trim();
+ if(!patch.isEmpty()) {
+ conf.setPatch(patch);
+ }
+ String javaHome = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME)).trim();
+ if(!javaHome.isEmpty()) {
+ conf.setJavaHome(javaHome);
+ }
+ String javaHomeForTests = Strings.nullToEmpty(commandLine.getOptionValue(JAVA_HOME_TEST)).trim();
+ if(!javaHomeForTests.isEmpty()) {
+ conf.setJavaHomeForTests(javaHomeForTests);
+ }
+ String antTestArgs = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_ARGS)).trim();
+ if(!antTestArgs.isEmpty()) {
+ conf.setAntTestArgs(antTestArgs);
+ }
+ String antEnvOpts = Strings.nullToEmpty(commandLine.getOptionValue(ANT_ENV_OPTS)).trim();
+ if(!antEnvOpts.isEmpty()) {
+ conf.setAntEnvOpts(antEnvOpts);
+ }
+ String antTestTarget = Strings.nullToEmpty(commandLine.getOptionValue(ANT_TEST_TARGET)).trim();
+ if(!antTestTarget.isEmpty()) {
+ conf.setAntTestTarget(antTestTarget);
+ }
+ String[] supplementalAntArgs = commandLine.getOptionValues(ANT_ARG);
+ if(supplementalAntArgs != null && supplementalAntArgs.length > 0) {
+ String antArgs = Strings.nullToEmpty(conf.getAntArgs());
+ if(!(antArgs.isEmpty() || antArgs.endsWith(" "))) {
+ antArgs += " ";
+ }
+ antArgs += "-" + ANT_ARG + Joiner.on(" -" + ANT_ARG).join(supplementalAntArgs);
+ conf.setAntArgs(antArgs);
+ }
+ ExecutionContextProvider executionContextProvider = null;
+ ExecutionContext executionContext = null;
+ int exitCode = 0;
+ try {
+ executionContextProvider = executionContextConfiguration
+ .getExecutionContextProvider();
+ executionContext = executionContextProvider.createExecutionContext();
+ LocalCommandFactory localCommandFactory = new LocalCommandFactory(LOG);
+ PTest ptest = new PTest(conf, executionContext, buildTag, logDir,
+ localCommandFactory, new SSHCommandExecutor(LOG, localCommandFactory, conf.getSshOpts()),
+ new RSyncCommandExecutor(LOG, 10, localCommandFactory), LOG);
+ exitCode = ptest.run();
+ } finally {
+ if(executionContext != null) {
+ executionContext.terminate();
+ }
+ if(executionContextProvider != null) {
+ executionContextProvider.close();
+ }
+ }
+ System.exit(exitCode);
}
}
Modified: hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java (original)
+++ hive/branches/spark/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/context/ExecutionContext.java Thu Oct 30 16:22:33 2014
@@ -46,7 +46,7 @@ public class ExecutionContext {
public void clearBadHosts() {
mBadHosts.clear();
}
- void addHost(Host host) {
+ public void addHost(Host host) {
mHosts.add(host);
}
boolean removeHost(Host host) {
Modified: hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm (original)
+++ hive/branches/spark/testutils/ptest2/src/main/resources/batch-exec.vm Thu Oct 30 16:22:33 2014
@@ -62,7 +62,7 @@ then
testModule=./
fi
pushd $testModule
- timeout 2h mvn -B -o test -Dmaven.repo.local=$localDir/$instanceName/maven \
+ timeout 2h mvn -B test -Dmaven.repo.local=$localDir/$instanceName/maven \
$mavenArgs $mavenTestArgs $testArguments 1>$logDir/maven-test.txt 2>&1 </dev/null &
#[[
pid=$!
Modified: hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java (original)
+++ hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestJIRAService.java Thu Oct 30 16:22:33 2014
@@ -18,17 +18,24 @@
*/
package org.apache.hive.ptest.execution;
+import com.google.common.collect.Lists;
+
import java.util.List;
import junit.framework.Assert;
+
+import org.apache.hive.ptest.execution.JIRAService.BuildInfo;
import org.junit.Test;
-import com.google.common.collect.Lists;
public class TestJIRAService {
@Test
public void testFormatBuildTagPositive() throws Throwable {
- Assert.assertEquals("abc/123", JIRAService.formatBuildTag("abc-123"));
- Assert.assertEquals("a-b-c/123", JIRAService.formatBuildTag("a-b-c-123"));
+ BuildInfo buildInfo = JIRAService.formatBuildTag("abc-123");
+ Assert.assertEquals("abc/123", buildInfo.getFormattedBuildTag());
+ Assert.assertEquals("abc", buildInfo.getBuildName());
+ buildInfo = JIRAService.formatBuildTag("PreCommit-HIVE-TRUNK-Build-1115");
+ Assert.assertEquals("PreCommit-HIVE-TRUNK-Build/1115", buildInfo.getFormattedBuildTag());
+ Assert.assertEquals("PreCommit-HIVE-TRUNK-Build", buildInfo.getBuildName());
}
@Test(expected=IllegalArgumentException.class)
public void testFormatBuildTagNoDashSlash() throws Throwable {
Modified: hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt (original)
+++ hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testAlternativeTestJVM.approved.txt Thu Oct 30 16:22:33 2014
@@ -61,7 +61,7 @@ then
testModule=./
fi
pushd $testModule
- timeout 2h mvn -B -o test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
+ timeout 2h mvn -B test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
$mavenArgs $mavenTestArgs -Dtest=arg1 1>/some/log/dir/maven-test.txt 2>&1 </dev/null &
pid=$!
Modified: hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt
URL: http://svn.apache.org/viewvc/hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt (original)
+++ hive/branches/spark/testutils/ptest2/src/test/java/org/apache/hive/ptest/execution/TestScripts.testBatch.approved.txt Thu Oct 30 16:22:33 2014
@@ -61,7 +61,7 @@ then
testModule=./
fi
pushd $testModule
- timeout 2h mvn -B -o test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
+ timeout 2h mvn -B test -Dmaven.repo.local=/some/local/dir/instance-1/maven \
-Dant=arg1 $mavenTestArgs -Dtest=arg1 1>/some/log/dir/maven-test.txt 2>&1 </dev/null &
pid=$!