You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/07 21:41:45 UTC
svn commit: r1637444 [20/20] - in /hive/branches/spark: ./
cli/src/test/org/apache/hadoop/hive/cli/ common/
common/src/java/org/apache/hadoop/hive/common/type/
common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/conf/ co...
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/auth/HttpAuthUtils.java Fri Nov 7 20:41:34 2014
@@ -18,16 +18,18 @@
package org.apache.hive.service.auth;
-import java.io.IOException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
+import javax.security.auth.Subject;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
@@ -36,60 +38,54 @@ import org.ietf.jgss.Oid;
* Utility functions for HTTP mode authentication.
*/
public final class HttpAuthUtils {
-
public static final String WWW_AUTHENTICATE = "WWW-Authenticate";
public static final String AUTHORIZATION = "Authorization";
public static final String BASIC = "Basic";
public static final String NEGOTIATE = "Negotiate";
-
- /**
- * @return Stringified Base64 encoded kerberosAuthHeader on success
- */
- public static String getKerberosServiceTicket(String principal, String host, String serverHttpUrl)
- throws IOException, InterruptedException {
- UserGroupInformation clientUGI = getClientUGI("kerberos");
- String serverPrincipal = getServerPrincipal(principal, host);
- // Uses the Ticket Granting Ticket in the UserGroupInformation
- return clientUGI.doAs(
- new HttpKerberosClientAction(serverPrincipal, clientUGI.getUserName(), serverHttpUrl));
- }
/**
- * Get server principal and verify that hostname is present.
- */
- private static String getServerPrincipal(String principal, String host) throws IOException {
- return ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host);
- }
-
- /**
- * JAAS login to setup the client UserGroupInformation.
- * Sets up the Kerberos Ticket Granting Ticket,
- * in the client UserGroupInformation object.
- *
- * @return Client's UserGroupInformation
+ * @return Stringified Base64 encoded kerberosAuthHeader on success
+ * @throws Exception
*/
- public static UserGroupInformation getClientUGI(String authType) throws IOException {
- return ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf(authType);
+ public static String getKerberosServiceTicket(String principal, String host,
+ String serverHttpUrl, boolean assumeSubject) throws Exception {
+ String serverPrincipal =
+ ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host);
+ if (assumeSubject) {
+ // With this option, we're assuming that the external application,
+ // using the JDBC driver has done a JAAS kerberos login already
+ AccessControlContext context = AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+ if (subject == null) {
+ throw new Exception("The Subject is not set");
+ }
+ return Subject.doAs(subject, new HttpKerberosClientAction(serverPrincipal, serverHttpUrl));
+ } else {
+ // JAAS login from ticket cache to setup the client UserGroupInformation
+ UserGroupInformation clientUGI =
+ ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf("kerberos");
+ return clientUGI.doAs(new HttpKerberosClientAction(serverPrincipal, serverHttpUrl));
+ }
}
private HttpAuthUtils() {
throw new UnsupportedOperationException("Can't initialize class");
}
+ /**
+ * We'll create an instance of this class within a doAs block so that the client's TGT credentials
+ * can be read from the Subject
+ */
public static class HttpKerberosClientAction implements PrivilegedExceptionAction<String> {
-
public static final String HTTP_RESPONSE = "HTTP_RESPONSE";
public static final String SERVER_HTTP_URL = "SERVER_HTTP_URL";
private final String serverPrincipal;
- private final String clientUserName;
private final String serverHttpUrl;
private final Base64 base64codec;
private final HttpContext httpContext;
- public HttpKerberosClientAction(String serverPrincipal, String clientUserName,
- String serverHttpUrl) {
+ public HttpKerberosClientAction(String serverPrincipal, String serverHttpUrl) {
this.serverPrincipal = serverPrincipal;
- this.clientUserName = clientUserName;
this.serverHttpUrl = serverHttpUrl;
base64codec = new Base64(0);
httpContext = new BasicHttpContext();
@@ -102,38 +98,17 @@ public final class HttpAuthUtils {
Oid mechOid = new Oid("1.2.840.113554.1.2.2");
// Oid for kerberos principal name
Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
-
GSSManager manager = GSSManager.getInstance();
-
- // GSS name for client
- GSSName clientName = manager.createName(clientUserName, GSSName.NT_USER_NAME);
// GSS name for server
GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
-
- // GSS credentials for client
- GSSCredential clientCreds =
- manager.createCredential(clientName, GSSCredential.DEFAULT_LIFETIME, mechOid,
- GSSCredential.INITIATE_ONLY);
-
- /*
- * Create a GSSContext for mutual authentication with the
- * server.
- * - serverName is the GSSName that represents the server.
- * - krb5Oid is the Oid that represents the mechanism to
- * use. The client chooses the mechanism to use.
- * - clientCreds are the client credentials
- */
+ // Create a GSSContext for authentication with the service.
+ // We're passing client credentials as null since we want them to be read from the Subject.
GSSContext gssContext =
- manager.createContext(serverName, mechOid, clientCreds, GSSContext.DEFAULT_LIFETIME);
-
- // Mutual authentication not r
+ manager.createContext(serverName, mechOid, null, GSSContext.DEFAULT_LIFETIME);
gssContext.requestMutualAuth(false);
-
// Establish context
byte[] inToken = new byte[0];
-
byte[] outToken = gssContext.initSecContext(inToken, 0, inToken.length);
-
gssContext.dispose();
// Base64 encoded and stringified token for server
return new String(base64codec.encode(outToken));
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/operation/OperationLog.java Fri Nov 7 20:41:34 2014
@@ -128,6 +128,12 @@ public class OperationLog {
void remove() {
try {
+ if (in != null) {
+ in.close();
+ }
+ if (out != null) {
+ out.close();
+ }
FileUtils.forceDelete(file);
isRemoved = true;
} catch (Exception e) {
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Fri Nov 7 20:41:34 2014
@@ -18,6 +18,8 @@
package org.apache.hive.service.cli.thrift;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -56,6 +58,10 @@ public class ThriftBinaryCLIService exte
TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TServerSocket serverSocket = null;
+ List<String> sslVersionBlacklist = new ArrayList<String>();
+ for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {
+ sslVersionBlacklist.add(sslVersion);
+ }
if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
} else {
@@ -67,13 +73,16 @@ public class ThriftBinaryCLIService exte
String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
- keyStorePassword);
+ keyStorePassword, sslVersionBlacklist);
}
// Server args
+ int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory).transportFactory(transportFactory)
- .protocolFactory(new TBinaryProtocol.Factory()).executorService(executorService);
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize))
+ .executorService(executorService);
// TCP Server
server = new TThreadPoolServer(sargs);
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Fri Nov 7 20:41:34 2014
@@ -19,7 +19,8 @@
package org.apache.hive.service.cli.thrift;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hive.service.AbstractService;
+import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.TSetIpAddressProcessor;
import org.apache.hive.service.cli.*;
@@ -53,7 +55,7 @@ public abstract class ThriftCLIService e
protected static HiveAuthFactory hiveAuthFactory;
protected int portNum;
- protected InetSocketAddress serverAddress;
+ protected InetAddress serverAddress;
protected String hiveHost;
protected TServer server;
protected org.eclipse.jetty.server.Server httpServer;
@@ -75,13 +77,21 @@ public abstract class ThriftCLIService e
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
-
// Initialize common server configs needed in both binary & http modes
String portString;
hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
if (hiveHost == null) {
hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
}
+ try {
+ if (hiveHost != null && !hiveHost.isEmpty()) {
+ serverAddress = InetAddress.getByName(hiveHost);
+ } else {
+ serverAddress = InetAddress.getLocalHost();
+ }
+ } catch (UnknownHostException e) {
+ throw new ServiceException(e);
+ }
// HTTP mode
if (HiveServer2.isHTTPTransportMode(hiveConf)) {
workerKeepAliveTime =
@@ -105,11 +115,6 @@ public abstract class ThriftCLIService e
portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
}
}
- if (hiveHost != null && !hiveHost.isEmpty()) {
- serverAddress = new InetSocketAddress(hiveHost, portNum);
- } else {
- serverAddress = new InetSocketAddress(portNum);
- }
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
super.init(hiveConf);
@@ -148,7 +153,7 @@ public abstract class ThriftCLIService e
return portNum;
}
- public InetSocketAddress getServerAddress() {
+ public InetAddress getServerAddress() {
return serverAddress;
}
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Fri Nov 7 20:41:34 2014
@@ -18,6 +18,7 @@
package org.apache.hive.service.cli.thrift;
+import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -83,6 +84,11 @@ public class ThriftHttpCLIService extend
+ " Not configured for SSL connection");
}
SslContextFactory sslContextFactory = new SslContextFactory();
+ String[] excludedProtocols = hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",");
+ LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols));
+ sslContextFactory.addExcludeProtocols(excludedProtocols);
+ LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = " +
+ Arrays.toString(sslContextFactory.getExcludeProtocols()));
sslContextFactory.setKeyStorePath(keyStorePath);
sslContextFactory.setKeyStorePassword(keyStorePassword);
connector = new SslSelectChannelConnector(sslContextFactory);
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java Fri Nov 7 20:41:34 2014
@@ -208,11 +208,9 @@ public class ThriftHttpServlet extends T
// Create a GSS context
gssContext = manager.createContext(serverCreds);
-
// Get service ticket from the authorization header
String serviceTicketBase64 = getAuthHeader(request, authType);
byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
-
gssContext.acceptSecContext(inToken, 0, inToken.length);
// Authenticate or deny based on its context completion
if (!gssContext.isEstablished()) {
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Fri Nov 7 20:41:34 2014
@@ -32,6 +32,10 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -53,7 +57,6 @@ import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
/**
@@ -66,7 +69,7 @@ public class HiveServer2 extends Composi
private CLIService cliService;
private ThriftCLIService thriftCLIService;
private String znodePath;
- private ZooKeeper zooKeeperClient;
+ private CuratorFramework zooKeeperClient;
private boolean registeredWithZooKeeper = false;
public HiveServer2() {
@@ -74,7 +77,6 @@ public class HiveServer2 extends Composi
HiveConf.setLoadHiveServer2Config(true);
}
-
@Override
public synchronized void init(HiveConf hiveConf) {
cliService = new CLIService(this);
@@ -109,36 +111,60 @@ public class HiveServer2 extends Composi
}
/**
+ * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
+ */
+ private final ACLProvider zooKeeperAclProvider = new ACLProvider() {
+ List<ACL> nodeAcls = new ArrayList<ACL>();
+
+ @Override
+ public List<ACL> getDefaultAcl() {
+ if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+ // Read all to the world
+ nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
+ // Create/Delete/Write/Admin to the authenticated user
+ nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
+ } else {
+ // ACLs for znodes on a non-kerberized cluster
+ // Create/Read/Delete/Write/Admin to the world
+ nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
+ }
+ return nodeAcls;
+ }
+
+ @Override
+ public List<ACL> getAclForPath(String path) {
+ return getDefaultAcl();
+ }
+ };
+
+ /**
* Adds a server instance to ZooKeeper as a znode.
*
* @param hiveConf
* @throws Exception
*/
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
- int zooKeeperSessionTimeout =
- hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
String instanceURI = getServerInstanceURI(hiveConf);
byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
- // Znode ACLs
- List<ACL> nodeAcls = new ArrayList<ACL>();
- setUpAuthAndAcls(hiveConf, nodeAcls);
- // Create a ZooKeeper client
+ setUpZooKeeperAuth(hiveConf);
+ // Create a CuratorFramework instance to be used as the ZooKeeper client
+ // Use the zooKeeperAclProvider to create appropriate ACLs
zooKeeperClient =
- new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
- new ZooKeeperHiveHelper.DummyWatcher());
+ CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+ .aclProvider(zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .build();
+ zooKeeperClient.start();
// Create the parent znodes recursively; ignore if the parent already exists.
- // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs,
- // as explained in {@link #setUpAuthAndAcls(HiveConf, List<ACL>) setUpAuthAndAcls}
try {
- ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls,
- CreateMode.PERSISTENT);
+ zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
- throw (e);
+ throw e;
}
}
// Create a znode under the rootNamespace parent for this instance of the server
@@ -149,56 +175,40 @@ public class HiveServer2 extends Composi
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
znodePath =
- zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls,
- CreateMode.EPHEMERAL_SEQUENTIAL);
+ zooKeeperClient.create().creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8);
setRegisteredWithZooKeeper(true);
// Set a watch on the znode
- if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+ if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
} catch (KeeperException e) {
LOG.fatal("Unable to create a znode for this server instance", e);
- throw new Exception(e);
+ throw (e);
}
}
/**
- * Set up ACLs for znodes based on whether the cluster is secure or not.
- * On a kerberized cluster, ZooKeeper performs Kerberos-SASL authentication.
- * We give Read privilege to the world, but Create/Delete/Write/Admin to the authenticated user.
- * On a non-kerberized cluster, we give Create/Read/Delete/Write/Admin privileges to the world.
+ * For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
- * For a kerberized cluster, we also dynamically set up the client's JAAS conf.
* @param hiveConf
- * @param nodeAcls
* @return
* @throws Exception
*/
- private void setUpAuthAndAcls(HiveConf hiveConf, List<ACL> nodeAcls) throws Exception {
+ private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
if (principal.isEmpty()) {
- throw new IOException(
- "HiveServer2 Kerberos principal is empty");
+ throw new IOException("HiveServer2 Kerberos principal is empty");
}
String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
if (keyTabFile.isEmpty()) {
- throw new IOException(
- "HiveServer2 Kerberos keytab is empty");
+ throw new IOException("HiveServer2 Kerberos keytab is empty");
}
-
// Install the JAAS Configuration for the runtime
ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
- // Read all to the world
- nodeAcls.addAll(Ids.READ_ACL_UNSAFE);
- // Create/Delete/Write/Admin to the authenticated user
- nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS));
- } else {
- // ACLs for znodes on a non-kerberized cluster
- // Create/Read/Delete/Write/Admin to the world
- nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE);
}
}
@@ -243,7 +253,7 @@ public class HiveServer2 extends Composi
if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
- return thriftCLIService.getServerAddress().getHostName() + ":"
+ return thriftCLIService.getServerAddress().getHostAddress() + ":"
+ thriftCLIService.getPortNumber();
}
@@ -344,24 +354,25 @@ public class HiveServer2 extends Composi
*/
static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exception {
HiveConf hiveConf = new HiveConf();
- int zooKeeperSessionTimeout =
- hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
- ZooKeeper zooKeeperClient =
- new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
- new ZooKeeperHiveHelper.DummyWatcher());
- // Get all znode paths
+ CuratorFramework zooKeeperClient =
+ CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ zooKeeperClient.start();
List<String> znodePaths =
- zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace,
- false);
+ zooKeeperClient.getChildren().forPath(
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
// Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
for (String znodePath : znodePaths) {
if (znodePath.contains("version=" + versionNumber + ";")) {
- zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
- + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1);
+ LOG.info("Removing the znode: " + znodePath + " from ZooKeeper");
+ zooKeeperClient.delete().forPath(
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath);
}
}
+ zooKeeperClient.close();
}
public static void main(String[] args) {
@@ -516,8 +527,8 @@ public class HiveServer2 extends Composi
}
/**
- * DeregisterOptionExecutor: executes the --deregister option by
- * deregistering all HiveServer2 instances from ZooKeeper of a specific version.
+ * DeregisterOptionExecutor: executes the --deregister option by deregistering all HiveServer2
+ * instances from ZooKeeper of a specific version.
*/
static class DeregisterOptionExecutor implements ServerOptionsExecutor {
private final String versionNumber;
@@ -539,4 +550,3 @@ public class HiveServer2 extends Composi
}
}
}
-
Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Fri Nov 7 20:41:34 2014
@@ -76,9 +76,6 @@ import org.apache.hadoop.security.authen
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
import org.apache.tez.test.MiniTezCluster;
import com.google.common.base.Joiner;
@@ -90,7 +87,6 @@ import com.google.common.collect.Iterabl
* Implemention of shims against Hadoop 0.23.0.
*/
public class Hadoop23Shims extends HadoopShimsSecure {
- private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
HadoopShims.MiniDFSShim cluster = null;
@@ -230,22 +226,13 @@ public class Hadoop23Shims extends Hadoo
*/
@Override
public void refreshDefaultQueue(Configuration conf, String userName) throws IOException {
- String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) {
- AllocationConfiguration allocConf = new AllocationConfiguration(conf);
- QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy();
- if (queuePolicy != null) {
- requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
- if (StringUtils.isNotBlank(requestedQueue)) {
- LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName);
- conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
- }
- }
+ ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, userName);
}
}
private boolean isFairScheduler (Configuration conf) {
- return FairScheduler.class.getName().
+ return "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler".
equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
}
Modified: hive/branches/spark/shims/aggregator/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/aggregator/pom.xml?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/aggregator/pom.xml (original)
+++ hive/branches/spark/shims/aggregator/pom.xml Fri Nov 7 20:41:34 2014
@@ -63,5 +63,11 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive.shims</groupId>
+ <artifactId>hive-shims-scheduler</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
</dependencies>
</project>
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java Fri Nov 7 20:41:34 2014
@@ -463,6 +463,16 @@ public abstract class HadoopShimsSecure
@Override
public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
+ String doAs = System.getenv("HADOOP_USER_NAME");
+ if(doAs != null && doAs.length() > 0) {
+ /*
+ * this allows doAs (proxy user) to be passed along across process boundary where
+ * delegation tokens are not supported. For example, a DDL stmt via WebHCat with
+ * a doAs parameter, forks to 'hcat' which needs to start a Session that
+ * proxies the end user
+ */
+ return UserGroupInformation.createProxyUser(doAs, UserGroupInformation.getLoginUser());
+ }
return UserGroupInformation.getCurrentUser();
}
Modified: hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (original)
+++ hive/branches/spark/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Fri Nov 7 20:41:34 2014
@@ -62,8 +62,7 @@ public class ZooKeeperTokenStore impleme
private String rootNode = "";
private volatile CuratorFramework zkSession;
private String zkConnectString;
- private final int zkSessionTimeout = 3000;
- private int connectTimeoutMillis = -1;
+ private int connectTimeoutMillis;
private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS));
/**
@@ -101,10 +100,10 @@ public class ZooKeeperTokenStore impleme
if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
synchronized (this) {
if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) {
- zkSession = CuratorFrameworkFactory.builder().connectString(zkConnectString)
- .sessionTimeoutMs(zkSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
- .aclProvider(aclDefaultProvider)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ zkSession =
+ CuratorFrameworkFactory.builder().connectString(zkConnectString)
+ .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
zkSession.start();
}
}
@@ -431,12 +430,14 @@ public class ZooKeeperTokenStore impleme
@Override
public void init(Object objectStore, ServerMode smode) {
this.serverMode = smode;
- zkConnectString = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+ zkConnectString =
+ conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
// try alternate config param
- zkConnectString = conf.get(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, null);
+ zkConnectString =
+ conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
+ null);
if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
throw new IllegalArgumentException("Zookeeper connect string has to be specifed through "
+ "either " + HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
@@ -445,14 +446,17 @@ public class ZooKeeperTokenStore impleme
+ WHEN_ZK_DSTORE_MSG);
}
}
- connectTimeoutMillis = conf.getInt(
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, -1);
+ connectTimeoutMillis =
+ conf.getInt(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+ CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
String aclStr = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ACL, null);
if (StringUtils.isNotBlank(aclStr)) {
this.newNodeAcl = parseACLs(aclStr);
}
- rootNode = conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
- HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
+ rootNode =
+ conf.get(HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode;
try {
// Install the JAAS Configuration for the runtime
Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java Fri Nov 7 20:41:34 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.shims;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.AppenderSkeleton;
@@ -33,6 +34,7 @@ public abstract class ShimLoader {
private static JettyShims jettyShims;
private static AppenderSkeleton eventCounter;
private static HadoopThriftAuthBridge hadoopThriftAuthBridge;
+ private static SchedulerShim schedulerShim;
/**
* The names of the classes for shimming Hadoop for each major version.
@@ -87,6 +89,9 @@ public abstract class ShimLoader {
}
+ private static final String SCHEDULER_SHIM_CLASSE =
+ "org.apache.hadoop.hive.schshim.FairSchedulerShim";
+
/**
* Factory method to get an instance of HadoopShims based on the
* version of Hadoop on the classpath.
@@ -124,6 +129,13 @@ public abstract class ShimLoader {
return hadoopThriftAuthBridge;
}
+ public static synchronized SchedulerShim getSchedulerShims() {
+ if (schedulerShim == null) {
+ schedulerShim = createShim(SCHEDULER_SHIM_CLASSE, SchedulerShim.class);
+ }
+ return schedulerShim;
+ }
+
private static <T> T loadShims(Map<String, String> classMap, Class<T> xface) {
String vers = getMajorVersion();
String className = classMap.get(vers);
Modified: hive/branches/spark/shims/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/pom.xml?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/pom.xml (original)
+++ hive/branches/spark/shims/pom.xml Fri Nov 7 20:41:34 2014
@@ -37,6 +37,7 @@
<module>common-secure</module>
<module>0.20S</module>
<module>0.23</module>
+ <module>scheduler</module>
<module>aggregator</module>
</modules>
</project>
Modified: hive/branches/spark/shims/scheduler/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/scheduler/pom.xml?rev=1637444&r1=1636884&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/shims/scheduler/pom.xml (original)
+++ hive/branches/spark/shims/scheduler/pom.xml Fri Nov 7 20:41:34 2014
@@ -80,6 +80,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<version>${hadoop-23.version}</version>
+ <optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>