You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/03/17 14:04:29 UTC
svn commit: r1578350 [2/2] - in /hive/branches/branch-0.13: ./
itests/hive-unit/src/test/java/org/apache/hive/service/cli/thrift/
jdbc/src/java/org/apache/hive/jdbc/
ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/
service/sr...
Modified: hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java (original)
+++ hive/branches/branch-0.13/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java Mon Mar 17 13:04:29 2014
@@ -19,6 +19,7 @@
package org.apache.hive.service.cli.thrift;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -28,61 +29,252 @@ import org.apache.commons.codec.binary.B
import org.apache.commons.codec.binary.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.service.auth.AuthenticationProviderFactory;
+import org.apache.hive.service.auth.AuthenticationProviderFactory.AuthMethods;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.HttpAuthUtils;
+import org.apache.hive.service.auth.HttpAuthenticationException;
+import org.apache.hive.service.auth.PasswdAuthenticationProvider;
+import org.apache.hive.service.cli.session.SessionManager;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServlet;
-
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+
+/**
+ *
+ * ThriftHttpServlet
+ *
+ */
public class ThriftHttpServlet extends TServlet {
private static final long serialVersionUID = 1L;
public static final Log LOG = LogFactory.getLog(ThriftHttpServlet.class.getName());
+ private final String authType;
+ private final UserGroupInformation serviceUGI;
- public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory) {
+ public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
+ String authType, UserGroupInformation serviceUGI) {
super(processor, protocolFactory);
+ this.authType = authType;
+ this.serviceUGI = serviceUGI;
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
- logRequestHeader(request);
- super.doPost(request, response);
- }
+ String clientUserName;
+ try {
+ // For a kerberos setup
+ if(isKerberosAuthMode(authType)) {
+ clientUserName = doKerberosAuth(request, serviceUGI);
+ }
+ else {
+ clientUserName = doPasswdAuth(request, authType);
+ }
- protected void logRequestHeader(HttpServletRequest request) {
- String authHeaderBase64 = request.getHeader("Authorization");
- if(authHeaderBase64 == null) {
- LOG.warn("ThriftHttpServlet: no HTTP Authorization header");
+ LOG.info("Client username: " + clientUserName);
+
+ // Set the thread local username to be used for doAs if true
+ SessionManager.setUserName(clientUserName);
+ super.doPost(request, response);
}
- else {
- if(!authHeaderBase64.startsWith("Basic")) {
- LOG.warn("ThriftHttpServlet: HTTP Authorization header exists but is not Basic.");
+ catch (HttpAuthenticationException e) {
+ // Send a 403 to the client
+ LOG.error("Error: ", e);
+ response.setContentType("application/x-thrift");
+ response.setStatus(HttpServletResponse.SC_FORBIDDEN);
+ // Send the response back to the client
+ response.getWriter().println("Authentication Error: " + e.getMessage());
+ }
+ finally {
+ // Clear the thread local username since we set it in each http request
+ SessionManager.clearUserName();
+ }
+ }
+
+ /**
+ * Do the LDAP/PAM authentication
+ * @param request
+ * @param authType
+ * @throws HttpAuthenticationException
+ */
+ private String doPasswdAuth(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String userName = getUsername(request, authType);
+ // No-op when authType is NOSASL
+ if (!authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.NOSASL.toString())) {
+ try {
+ AuthMethods authMethod = AuthMethods.getValidAuthMethod(authType);
+ PasswdAuthenticationProvider provider =
+ AuthenticationProviderFactory.getAuthenticationProvider(authMethod);
+ provider.Authenticate(userName, getPassword(request, authType));
+
+ } catch (Exception e) {
+ throw new HttpAuthenticationException(e);
}
- else if(LOG.isDebugEnabled()) {
- String authHeaderBase64_Payload = authHeaderBase64.substring("Basic ".length());
- String authHeaderString = StringUtils.newStringUtf8(
- Base64.decodeBase64(authHeaderBase64_Payload.getBytes()));
- String[] creds = authHeaderString.split(":");
- String username = null;
- String password = null;
+ }
+ return userName;
+ }
- if(creds.length >= 1) {
- username = creds[0];
- }
- if(creds.length >= 2) {
- password = creds[1];
- }
- if(password == null || password.equals("null") || password.equals("")) {
- password = "<no password>";
+ /**
+ * Do the GSS-API kerberos authentication.
+ * We already have a logged in subject in the form of serviceUGI,
+ * which GSS-API will extract information from.
+ * @param request
+ * @return
+ * @throws HttpAuthenticationException
+ */
+ private String doKerberosAuth(HttpServletRequest request,
+ UserGroupInformation serviceUGI) throws HttpAuthenticationException {
+ try {
+ return serviceUGI.doAs(new HttpKerberosServerAction(request, serviceUGI));
+ } catch (Exception e) {
+ throw new HttpAuthenticationException(e);
+ }
+ }
+
+ class HttpKerberosServerAction implements PrivilegedExceptionAction<String> {
+ HttpServletRequest request;
+ UserGroupInformation serviceUGI;
+
+ HttpKerberosServerAction(HttpServletRequest request,
+ UserGroupInformation serviceUGI) {
+ this.request = request;
+ this.serviceUGI = serviceUGI;
+ }
+
+ @Override
+ public String run() throws HttpAuthenticationException {
+ // Get own Kerberos credentials for accepting connection
+ GSSManager manager = GSSManager.getInstance();
+ GSSContext gssContext = null;
+ String serverPrincipal = getPrincipalWithoutRealm(
+ serviceUGI.getUserName());
+ try {
+ // This Oid for Kerberos GSS-API mechanism.
+ Oid mechOid = new Oid("1.2.840.113554.1.2.2");
+ // Oid for kerberos principal name
+ Oid krb5PrincipalOid = new Oid("1.2.840.113554.1.2.2.1");
+
+ // GSS name for server
+ GSSName serverName = manager.createName(serverPrincipal, krb5PrincipalOid);
+
+ // GSS credentials for server
+ GSSCredential serverCreds = manager.createCredential(serverName,
+ GSSCredential.DEFAULT_LIFETIME, mechOid, GSSCredential.ACCEPT_ONLY);
+
+ // Create a GSS context
+ gssContext = manager.createContext(serverCreds);
+
+ // Get service ticket from the authorization header
+ String serviceTicketBase64 = getAuthHeader(request, authType);
+ byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
+
+ gssContext.acceptSecContext(inToken, 0, inToken.length);
+ // Authenticate or deny based on its context completion
+ if (!gssContext.isEstablished()) {
+ throw new HttpAuthenticationException("Kerberos authentication failed: " +
+ "unable to establish context with the service ticket " +
+ "provided by the client.");
}
else {
- // don't log the actual password.
- password = "******";
+ return getPrincipalWithoutRealm(gssContext.getSrcName().toString());
+ }
+ }
+ catch (GSSException e) {
+ throw new HttpAuthenticationException("Kerberos authentication failed: ", e);
+ }
+ finally {
+ if (gssContext != null) {
+ try {
+ gssContext.dispose();
+ } catch (GSSException e) {
+ // No-op
+ }
}
- LOG.debug("HttpServlet: HTTP Authorization header:: username=" + username +
- " password=" + password);
}
}
+
+ private String getPrincipalWithoutRealm(String fullPrincipal) {
+ String names[] = fullPrincipal.split("[@]");
+ return names[0];
+ }
+ }
+
+ private String getUsername(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String creds[] = getAuthHeaderTokens(request, authType);
+ // Username must be present
+ if (creds[0] == null || creds[0].isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client does not contain username.");
+ }
+ return creds[0];
+ }
+
+ private String getPassword(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String creds[] = getAuthHeaderTokens(request, authType);
+ // Password must be present
+ if (creds[1] == null || creds[1].isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client does not contain username.");
+ }
+ return creds[1];
+ }
+
+ private String[] getAuthHeaderTokens(HttpServletRequest request,
+ String authType) throws HttpAuthenticationException {
+ String authHeaderBase64 = getAuthHeader(request, authType);
+ String authHeaderString = StringUtils.newStringUtf8(
+ Base64.decodeBase64(authHeaderBase64.getBytes()));
+ String[] creds = authHeaderString.split(":");
+ return creds;
}
+ /**
+ * Returns the base64 encoded auth header payload
+ * @param request
+ * @param authType
+ * @return
+ * @throws HttpAuthenticationException
+ */
+ private String getAuthHeader(HttpServletRequest request, String authType)
+ throws HttpAuthenticationException {
+ String authHeader = request.getHeader(HttpAuthUtils.AUTHORIZATION);
+ // Each http request must have an Authorization header
+ if (authHeader == null || authHeader.isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client is empty.");
+ }
+
+ String authHeaderBase64String;
+ int beginIndex;
+ if (isKerberosAuthMode(authType)) {
+ beginIndex = (HttpAuthUtils.NEGOTIATE + " ").length();
+ }
+ else {
+ beginIndex = (HttpAuthUtils.BASIC + " ").length();
+ }
+ authHeaderBase64String = authHeader.substring(beginIndex);
+ // Authorization header must have a payload
+ if (authHeaderBase64String == null || authHeaderBase64String.isEmpty()) {
+ throw new HttpAuthenticationException("Authorization header received " +
+ "from the client does not contain any data.");
+ }
+ return authHeaderBase64String;
+ }
+
+ private boolean isKerberosAuthMode(String authType) {
+ return authType.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString());
+ }
}
+
Modified: hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/branches/branch-0.13/shims/common-secure/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Mon Mar 17 13:04:29 2014
@@ -66,99 +66,121 @@ import org.apache.thrift.transport.TTran
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
- /**
- * Functions that bridge Thrift's SASL transports to Hadoop's
- * SASL callback handlers and authentication classes.
- */
- public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge {
- static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
-
- @Override
- public Client createClient() {
- return new Client();
- }
-
- @Override
- public Client createClientWithConf(String authType) {
- Configuration conf = new Configuration();
- conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
- UserGroupInformation.setConfiguration(conf);
- return new Client();
- }
-
- @Override
- public Server createServer(String keytabFile, String principalConf) throws TTransportException {
- return new Server(keytabFile, principalConf);
- }
-
- /**
- * Read and return Hadoop SASL configuration which can be configured using
- * "hadoop.rpc.protection"
- * @param conf
- * @return Hadoop SASL configuration
- */
- @Override
- public Map<String, String> getHadoopSaslProperties(Configuration conf) {
- // Initialize the SaslRpcServer to ensure QOP parameters are read from conf
- SaslRpcServer.init(conf);
- return SaslRpcServer.SASL_PROPS;
- }
-
- public static class Client extends HadoopThriftAuthBridge.Client {
- /**
- * Create a client-side SASL transport that wraps an underlying transport.
- *
- * @param method The authentication method to use. Currently only KERBEROS is
- * supported.
- * @param serverPrincipal The Kerberos principal of the target server.
- * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
- * @param saslProps the sasl properties to create the client with
- */
-
- @Override
- public TTransport createClientTransport(
- String principalConfig, String host,
- String methodStr, String tokenStrForm, TTransport underlyingTransport,
- Map<String, String> saslProps) throws IOException {
- AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
-
- TTransport saslTransport = null;
- switch (method) {
- case DIGEST:
- Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
- t.decodeFromUrlString(tokenStrForm);
- saslTransport = new TSaslClientTransport(
+/**
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
+ */
+public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge {
+ static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
+
+ @Override
+ public Client createClient() {
+ return new Client();
+ }
+
+ @Override
+ public Client createClientWithConf(String authType) {
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
+ UserGroupInformation.setConfiguration(conf);
+ return new Client();
+ }
+
+ @Override
+ public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+ return new Server(keytabFile, principalConf);
+ }
+
+ @Override
+ public String getServerPrincipal(String principalConfig, String host)
+ throws IOException {
+ String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ return serverPrincipal;
+ }
+
+ @Override
+ public UserGroupInformation getCurrentUGIWithConf(String authType)
+ throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, authType);
+ UserGroupInformation.setConfiguration(conf);
+ return UserGroupInformation.getCurrentUser();
+ }
+
+ /**
+ * Read and return Hadoop SASL configuration which can be configured using
+ * "hadoop.rpc.protection"
+ * @param conf
+ * @return Hadoop SASL configuration
+ */
+ @Override
+ public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+ // Initialize the SaslRpcServer to ensure QOP parameters are read from conf
+ SaslRpcServer.init(conf);
+ return SaslRpcServer.SASL_PROPS;
+ }
+
+ public static class Client extends HadoopThriftAuthBridge.Client {
+ /**
+ * Create a client-side SASL transport that wraps an underlying transport.
+ *
+ * @param method The authentication method to use. Currently only KERBEROS is
+ * supported.
+ * @param serverPrincipal The Kerberos principal of the target server.
+ * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+ * @param saslProps the sasl properties to create the client with
+ */
+
+ @Override
+ public TTransport createClientTransport(
+ String principalConfig, String host,
+ String methodStr, String tokenStrForm, TTransport underlyingTransport,
+ Map<String, String> saslProps) throws IOException {
+ AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+ TTransport saslTransport = null;
+ switch (method) {
+ case DIGEST:
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ saslTransport = new TSaslClientTransport(
method.getMechanismName(),
null,
null, SaslRpcServer.SASL_DEFAULT_REALM,
saslProps, new SaslClientCallbackHandler(t),
underlyingTransport);
- return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+ case KERBEROS:
+ String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ try {
+ saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ names[0], names[1],
+ saslProps, null,
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+ } catch (SaslException se) {
+ throw new IOException("Could not instantiate SASL transport", se);
+ }
- case KERBEROS:
- String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
- String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
- if (names.length != 3) {
- throw new IOException(
- "Kerberos principal name does NOT have the expected hostname part: "
- + serverPrincipal);
- }
- try {
- saslTransport = new TSaslClientTransport(
- method.getMechanismName(),
- null,
- names[0], names[1],
- saslProps, null,
- underlyingTransport);
- return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
- } catch (SaslException se) {
- throw new IOException("Could not instantiate SASL transport", se);
- }
-
- default:
- throw new IOException("Unsupported authentication method: " + method);
- }
- }
+ default:
+ throw new IOException("Unsupported authentication method: " + method);
+ }
+ }
private static class SaslClientCallbackHandler implements CallbackHandler {
private final String userName;
private final char[] userPassword;
@@ -168,8 +190,9 @@ import org.apache.thrift.transport.TTran
this.userPassword = encodePassword(token.getPassword());
}
+ @Override
public void handle(Callback[] callbacks)
- throws UnsupportedCallbackException {
+ throws UnsupportedCallbackException {
NameCallback nc = null;
PasswordCallback pc = null;
RealmCallback rc = null;
@@ -214,253 +237,254 @@ import org.apache.thrift.transport.TTran
static char[] encodePassword(byte[] password) {
return new String(Base64.encodeBase64(password)).toCharArray();
- }
- }
- }
-
- public static class Server extends HadoopThriftAuthBridge.Server {
- final UserGroupInformation realUgi;
- DelegationTokenSecretManager secretManager;
- private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
- //Delegation token related keys
- public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
- "hive.cluster.delegation.key.update-interval";
- public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
- 24*60*60*1000; // 1 day
- public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
- "hive.cluster.delegation.token.renew-interval";
- public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
- 24*60*60*1000; // 1 day
- public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
- "hive.cluster.delegation.token.max-lifetime";
- public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
- 7*24*60*60*1000; // 7 days
- public static final String DELEGATION_TOKEN_STORE_CLS =
- "hive.cluster.delegation.token.store.class";
- public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
- "hive.cluster.delegation.token.store.zookeeper.connectString";
- public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
- "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
- public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
- "hive.cluster.delegation.token.store.zookeeper.znode";
- public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
- "hive.cluster.delegation.token.store.zookeeper.acl";
- public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
- "/hive/cluster/delegation";
-
- public Server() throws TTransportException {
- try {
- realUgi = UserGroupInformation.getCurrentUser();
- } catch (IOException ioe) {
- throw new TTransportException(ioe);
- }
- }
- /**
- * Create a server with a kerberos keytab/principal.
- */
- protected Server(String keytabFile, String principalConf)
- throws TTransportException {
- if (keytabFile == null || keytabFile.isEmpty()) {
- throw new TTransportException("No keytab specified");
- }
- if (principalConf == null || principalConf.isEmpty()) {
- throw new TTransportException("No principal specified");
- }
-
- // Login from the keytab
- String kerberosName;
- try {
- kerberosName =
- SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
- UserGroupInformation.loginUserFromKeytab(
- kerberosName, keytabFile);
- realUgi = UserGroupInformation.getLoginUser();
- assert realUgi.isFromKeytab();
- } catch (IOException ioe) {
- throw new TTransportException(ioe);
- }
- }
-
- /**
- * Create a TTransportFactory that, upon connection of a client socket,
- * negotiates a Kerberized SASL transport. The resulting TTransportFactory
- * can be passed as both the input and output transport factory when
- * instantiating a TThreadPoolServer, for example.
- *
- * @param saslProps Map of SASL properties
- */
- @Override
- public TTransportFactory createTransportFactory(Map<String, String> saslProps)
- throws TTransportException {
- // Parse out the kerberos principal, host, realm.
- String kerberosName = realUgi.getUserName();
- final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
- if (names.length != 3) {
- throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
- }
-
- TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
- transFactory.addServerDefinition(
- AuthMethod.KERBEROS.getMechanismName(),
- names[0], names[1], // two parts of kerberos principal
- saslProps,
- new SaslRpcServer.SaslGssCallbackHandler());
- transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+ }
+ }
+ }
+
+ public static class Server extends HadoopThriftAuthBridge.Server {
+ final UserGroupInformation realUgi;
+ DelegationTokenSecretManager secretManager;
+ private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+ //Delegation token related keys
+ public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+ "hive.cluster.delegation.key.update-interval";
+ public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+ "hive.cluster.delegation.token.renew-interval";
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+ "hive.cluster.delegation.token.max-lifetime";
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+ 7*24*60*60*1000; // 7 days
+ public static final String DELEGATION_TOKEN_STORE_CLS =
+ "hive.cluster.delegation.token.store.class";
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+ "hive.cluster.delegation.token.store.zookeeper.connectString";
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
+ "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
+ "hive.cluster.delegation.token.store.zookeeper.znode";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+ "hive.cluster.delegation.token.store.zookeeper.acl";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT =
+ "/hive/cluster/delegation";
+
+ public Server() throws TTransportException {
+ try {
+ realUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+ /**
+ * Create a server with a kerberos keytab/principal.
+ */
+ protected Server(String keytabFile, String principalConf)
+ throws TTransportException {
+ if (keytabFile == null || keytabFile.isEmpty()) {
+ throw new TTransportException("No keytab specified");
+ }
+ if (principalConf == null || principalConf.isEmpty()) {
+ throw new TTransportException("No principal specified");
+ }
+
+ // Login from the keytab
+ String kerberosName;
+ try {
+ kerberosName =
+ SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+ UserGroupInformation.loginUserFromKeytab(
+ kerberosName, keytabFile);
+ realUgi = UserGroupInformation.getLoginUser();
+ assert realUgi.isFromKeytab();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+
+ /**
+ * Create a TTransportFactory that, upon connection of a client socket,
+ * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+ * can be passed as both the input and output transport factory when
+ * instantiating a TThreadPoolServer, for example.
+ *
+ * @param saslProps Map of SASL properties
+ */
+ @Override
+ public TTransportFactory createTransportFactory(Map<String, String> saslProps)
+ throws TTransportException {
+ // Parse out the kerberos principal, host, realm.
+ String kerberosName = realUgi.getUserName();
+ final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+ if (names.length != 3) {
+ throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+ }
+
+ TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+ transFactory.addServerDefinition(
+ AuthMethod.KERBEROS.getMechanismName(),
+ names[0], names[1], // two parts of kerberos principal
+ saslProps,
+ new SaslRpcServer.SaslGssCallbackHandler());
+ transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
null, SaslRpcServer.SASL_DEFAULT_REALM,
saslProps, new SaslDigestCallbackHandler(secretManager));
- return new TUGIAssumingTransportFactory(transFactory, realUgi);
- }
+ return new TUGIAssumingTransportFactory(transFactory, realUgi);
+ }
+
+ /**
+ * Wrap a TProcessor in such a way that, before processing any RPC, it
+ * assumes the UserGroupInformation of the user authenticated by
+ * the SASL transport.
+ */
+ @Override
+ public TProcessor wrapProcessor(TProcessor processor) {
+ return new TUGIAssumingProcessor(processor, secretManager, true);
+ }
- /**
- * Wrap a TProcessor in such a way that, before processing any RPC, it
- * assumes the UserGroupInformation of the user authenticated by
- * the SASL transport.
- */
- @Override
- public TProcessor wrapProcessor(TProcessor processor) {
- return new TUGIAssumingProcessor(processor, secretManager, true);
- }
-
- /**
- * Wrap a TProcessor to capture the client information like connecting userid, ip etc
- */
- @Override
- public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
+ /**
+ * Wrap a TProcessor to capture the client information like connecting userid, ip etc
+ */
+ @Override
+ public TProcessor wrapNonAssumingProcessor(TProcessor processor) {
return new TUGIAssumingProcessor(processor, secretManager, false);
- }
+ }
protected DelegationTokenStore getTokenStore(Configuration conf)
throws IOException {
- String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
- if (StringUtils.isBlank(tokenStoreClassName)) {
- return new MemoryTokenStore();
- }
- try {
+ String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+ if (StringUtils.isBlank(tokenStoreClassName)) {
+ return new MemoryTokenStore();
+ }
+ try {
Class<? extends DelegationTokenStore> storeClass = Class
.forName(tokenStoreClassName).asSubclass(
DelegationTokenStore.class);
return ReflectionUtils.newInstance(storeClass, conf);
- } catch (ClassNotFoundException e) {
+ } catch (ClassNotFoundException e) {
throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
e);
- }
- }
+ }
+ }
+
+ @Override
+ public void startDelegationTokenSecretManager(Configuration conf, Object hms)
+ throws IOException{
+ long secretKeyInterval =
+ conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+ DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+ long tokenMaxLifetime =
+ conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+ DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+ long tokenRenewInterval =
+ conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+
+ DelegationTokenStore dts = getTokenStore(conf);
+ dts.setStore(hms);
+ secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+ tokenMaxLifetime,
+ tokenRenewInterval,
+ DELEGATION_TOKEN_GC_INTERVAL, dts);
+ secretManager.startThreads();
+ }
+
+ @Override
+ public String getDelegationToken(final String owner, final String renewer)
+ throws IOException, InterruptedException {
+ if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+ throw new AuthorizationException(
+ "Delegation Token can be issued only with kerberos authentication. " +
+ "Current AuthenticationMethod: " + authenticationMethod.get()
+ );
+ }
+ //if the user asking the token is same as the 'owner' then don't do
+ //any proxy authorization checks. For cases like oozie, where it gets
+ //a delegation token for another user, we need to make sure oozie is
+ //authorized to get a delegation token.
+ //Do all checks on short names
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+ UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+ if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+ //in the case of proxy users, the getCurrentUser will return the
+ //real user (for e.g. oozie) due to the doAs that happened just before the
+ //server started executing the method getDelegationToken in the MetaStore
+ ownerUgi = UserGroupInformation.createProxyUser(owner,
+ UserGroupInformation.getCurrentUser());
+ InetAddress remoteAddr = getRemoteAddress();
+ ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
+ }
+ return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws IOException {
+ return secretManager.getDelegationToken(renewer);
+ }
+ });
+ }
+
+ @Override
+ public String getDelegationTokenWithService(String owner, String renewer, String service)
+ throws IOException, InterruptedException {
+ String token = getDelegationToken(owner, renewer);
+ return ShimLoader.getHadoopShims().addServiceToToken(token, service);
+ }
+
+ @Override
+ public long renewDelegationToken(String tokenStrForm) throws IOException {
+ if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+ throw new AuthorizationException(
+ "Delegation Token can be issued only with kerberos authentication. " +
+ "Current AuthenticationMethod: " + authenticationMethod.get()
+ );
+ }
+ return secretManager.renewDelegationToken(tokenStrForm);
+ }
+
+ @Override
+ public String getUserFromToken(String tokenStr) throws IOException {
+ return secretManager.getUserFromToken(tokenStr);
+ }
+
+ @Override
+ public void cancelDelegationToken(String tokenStrForm) throws IOException {
+ secretManager.cancelDelegationToken(tokenStrForm);
+ }
+
+ final static ThreadLocal<InetAddress> remoteAddress =
+ new ThreadLocal<InetAddress>() {
+ @Override
+ protected synchronized InetAddress initialValue() {
+ return null;
+ }
+ };
+
+ @Override
+ public InetAddress getRemoteAddress() {
+ return remoteAddress.get();
+ }
+
+ final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+ new ThreadLocal<AuthenticationMethod>() {
+ @Override
+ protected synchronized AuthenticationMethod initialValue() {
+ return AuthenticationMethod.TOKEN;
+ }
+ };
+
+ private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
+ @Override
+ protected synchronized String initialValue() {
+ return null;
+ }
+ };
- @Override
- public void startDelegationTokenSecretManager(Configuration conf, Object hms)
- throws IOException{
- long secretKeyInterval =
- conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
- DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
- long tokenMaxLifetime =
- conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
- DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
- long tokenRenewInterval =
- conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
- DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
-
- DelegationTokenStore dts = getTokenStore(conf);
- dts.setStore(hms);
- secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
- tokenMaxLifetime,
- tokenRenewInterval,
- DELEGATION_TOKEN_GC_INTERVAL, dts);
- secretManager.startThreads();
- }
-
- @Override
- public String getDelegationToken(final String owner, final String renewer)
- throws IOException, InterruptedException {
- if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
- throw new AuthorizationException(
- "Delegation Token can be issued only with kerberos authentication. " +
- "Current AuthenticationMethod: " + authenticationMethod.get()
- );
- }
- //if the user asking the token is same as the 'owner' then don't do
- //any proxy authorization checks. For cases like oozie, where it gets
- //a delegation token for another user, we need to make sure oozie is
- //authorized to get a delegation token.
- //Do all checks on short names
- UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
- UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
- if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
- //in the case of proxy users, the getCurrentUser will return the
- //real user (for e.g. oozie) due to the doAs that happened just before the
- //server started executing the method getDelegationToken in the MetaStore
- ownerUgi = UserGroupInformation.createProxyUser(owner,
- UserGroupInformation.getCurrentUser());
- InetAddress remoteAddr = getRemoteAddress();
- ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
- }
- return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
- public String run() throws IOException {
- return secretManager.getDelegationToken(renewer);
- }
- });
- }
-
- @Override
- public String getDelegationTokenWithService(String owner, String renewer, String service)
- throws IOException, InterruptedException {
- String token = getDelegationToken(owner, renewer);
- return ShimLoader.getHadoopShims().addServiceToToken(token, service);
- }
-
- @Override
- public long renewDelegationToken(String tokenStrForm) throws IOException {
- if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
- throw new AuthorizationException(
- "Delegation Token can be issued only with kerberos authentication. " +
- "Current AuthenticationMethod: " + authenticationMethod.get()
- );
- }
- return secretManager.renewDelegationToken(tokenStrForm);
- }
-
- @Override
- public String getUserFromToken(String tokenStr) throws IOException {
- return secretManager.getUserFromToken(tokenStr);
- }
-
- @Override
- public void cancelDelegationToken(String tokenStrForm) throws IOException {
- secretManager.cancelDelegationToken(tokenStrForm);
- }
-
- final static ThreadLocal<InetAddress> remoteAddress =
- new ThreadLocal<InetAddress>() {
- @Override
- protected synchronized InetAddress initialValue() {
- return null;
- }
- };
-
- @Override
- public InetAddress getRemoteAddress() {
- return remoteAddress.get();
- }
-
- final static ThreadLocal<AuthenticationMethod> authenticationMethod =
- new ThreadLocal<AuthenticationMethod>() {
- @Override
- protected synchronized AuthenticationMethod initialValue() {
- return AuthenticationMethod.TOKEN;
- }
- };
-
- private static ThreadLocal<String> remoteUser = new ThreadLocal<String> () {
- @Override
- protected synchronized String initialValue() {
- return null;
- }
- };
-
- @Override
- public String getRemoteUser() {
- return remoteUser.get();
- }
+ @Override
+ public String getRemoteUser() {
+ return remoteUser.get();
+ }
/** CallbackHandler for SASL DIGEST-MD5 mechanism */
// This code is pretty much completely based on Hadoop's
@@ -501,12 +525,12 @@ import org.apache.thrift.transport.TTran
continue; // realm is ignored
} else {
throw new UnsupportedCallbackException(callback,
- "Unrecognized SASL DIGEST-MD5 Callback");
+ "Unrecognized SASL DIGEST-MD5 Callback");
}
}
if (pc != null) {
DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
- getIdentifier(nc.getDefaultName(), secretManager);
+ getIdentifier(nc.getDefaultName(), secretManager);
char[] password = getPassword(tokenIdentifier);
if (LOG.isDebugEnabled()) {
@@ -526,7 +550,7 @@ import org.apache.thrift.transport.TTran
if (ac.isAuthorized()) {
if (LOG.isDebugEnabled()) {
String username =
- SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+ SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ "canonicalized client ID: " + username);
}
@@ -534,117 +558,120 @@ import org.apache.thrift.transport.TTran
}
}
}
- }
+ }
- /**
- * Processor that pulls the SaslServer object out of the transport, and
- * assumes the remote user's UGI before calling through to the original
- * processor.
- *
- * This is used on the server side to set the UGI for each specific call.
- */
- protected class TUGIAssumingProcessor implements TProcessor {
- final TProcessor wrapped;
- DelegationTokenSecretManager secretManager;
- boolean useProxy;
- TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
- boolean useProxy) {
- this.wrapped = wrapped;
- this.secretManager = secretManager;
- this.useProxy = useProxy;
- }
-
- public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
- TTransport trans = inProt.getTransport();
- if (!(trans instanceof TSaslServerTransport)) {
- throw new TException("Unexpected non-SASL transport " + trans.getClass());
- }
- TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
- SaslServer saslServer = saslTrans.getSaslServer();
- String authId = saslServer.getAuthorizationID();
- authenticationMethod.set(AuthenticationMethod.KERBEROS);
- LOG.debug("AUTH ID ======>" + authId);
- String endUser = authId;
-
- if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
- try {
- TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
- secretManager);
- endUser = tokenId.getUser().getUserName();
- authenticationMethod.set(AuthenticationMethod.TOKEN);
- } catch (InvalidToken e) {
- throw new TException(e.getMessage());
- }
- }
- Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
- remoteAddress.set(socket.getInetAddress());
- UserGroupInformation clientUgi = null;
- try {
- if (useProxy) {
- clientUgi = UserGroupInformation.createProxyUser(
- endUser, UserGroupInformation.getLoginUser());
- remoteUser.set(clientUgi.getShortUserName());
- return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
- public Boolean run() {
- try {
- return wrapped.process(inProt, outProt);
- } catch (TException te) {
- throw new RuntimeException(te);
- }
- }
- });
- } else {
- remoteUser.set(endUser);
- return wrapped.process(inProt, outProt);
- }
- } catch (RuntimeException rte) {
- if (rte.getCause() instanceof TException) {
- throw (TException)rte.getCause();
- }
- throw rte;
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie); // unexpected!
- } catch (IOException ioe) {
- throw new RuntimeException(ioe); // unexpected!
- }
- finally {
- if (clientUgi != null) {
- try { FileSystem.closeAllForUGI(clientUgi); }
- catch(IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ */
+ protected class TUGIAssumingProcessor implements TProcessor {
+ final TProcessor wrapped;
+ DelegationTokenSecretManager secretManager;
+ boolean useProxy;
+ TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager,
+ boolean useProxy) {
+ this.wrapped = wrapped;
+ this.secretManager = secretManager;
+ this.useProxy = useProxy;
+ }
+
+ @Override
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass());
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ authenticationMethod.set(AuthenticationMethod.KERBEROS);
+ LOG.debug("AUTH ID ======>" + authId);
+ String endUser = authId;
+
+ if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+ try {
+ TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+ secretManager);
+ endUser = tokenId.getUser().getUserName();
+ authenticationMethod.set(AuthenticationMethod.TOKEN);
+ } catch (InvalidToken e) {
+ throw new TException(e.getMessage());
+ }
+ }
+ Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+ remoteAddress.set(socket.getInetAddress());
+ UserGroupInformation clientUgi = null;
+ try {
+ if (useProxy) {
+ clientUgi = UserGroupInformation.createProxyUser(
+ endUser, UserGroupInformation.getLoginUser());
+ remoteUser.set(clientUgi.getShortUserName());
+ return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() {
+ try {
+ return wrapped.process(inProt, outProt);
+ } catch (TException te) {
+ throw new RuntimeException(te);
+ }
}
+ });
+ } else {
+ remoteUser.set(endUser);
+ return wrapped.process(inProt, outProt);
+ }
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TException) {
+ throw (TException)rte.getCause();
+ }
+ throw rte;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie); // unexpected!
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe); // unexpected!
+ }
+ finally {
+ if (clientUgi != null) {
+ try { FileSystem.closeAllForUGI(clientUgi); }
+ catch(IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
+ }
}
- }
- }
- }
+ }
+ }
+ }
/**
- * A TransportFactory that wraps another one, but assumes a specified UGI
- * before calling through.
- *
- * This is used on the server side to assume the server's Principal when accepting
- * clients.
- */
- static class TUGIAssumingTransportFactory extends TTransportFactory {
- private final UserGroupInformation ugi;
- private final TTransportFactory wrapped;
-
- public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
- assert wrapped != null;
- assert ugi != null;
-
- this.wrapped = wrapped;
- this.ugi = ugi;
- }
-
- @Override
- public TTransport getTransport(final TTransport trans) {
- return ugi.doAs(new PrivilegedAction<TTransport>() {
- public TTransport run() {
- return wrapped.getTransport(trans);
- }
- });
- }
- }
- }
- }
+ * A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final UserGroupInformation ugi;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+ assert wrapped != null;
+ assert ugi != null;
+
+ this.wrapped = wrapped;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ return ugi.doAs(new PrivilegedAction<TTransport>() {
+ @Override
+ public TTransport run() {
+ return wrapped.getTransport(trans);
+ }
+ });
+ }
+ }
+ }
+}
Modified: hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Mon Mar 17 13:04:29 2014
@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -80,15 +79,15 @@ public interface HadoopShims {
* @return TaskAttempt Log Url
*/
String getTaskAttemptLogUrl(JobConf conf,
- String taskTrackerHttpAddress,
- String taskAttemptId)
- throws MalformedURLException;
+ String taskTrackerHttpAddress,
+ String taskAttemptId)
+ throws MalformedURLException;
/**
* Returns a shim to wrap MiniMrCluster
*/
public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers,
- String nameNode, int numDir) throws IOException;
+ String nameNode, int numDir) throws IOException;
/**
* Shim for MiniMrCluster
@@ -125,7 +124,7 @@ public interface HadoopShims {
String archiveName) throws Exception;
public URI getHarUri(URI original, URI base, URI originalBase)
- throws URISyntaxException;
+ throws URISyntaxException;
/**
* Hive uses side effect files exclusively for it's output. It also manages
* the setup/cleanup/commit of output from the hive client. As a result it does
@@ -165,7 +164,7 @@ public interface HadoopShims {
* @throws InterruptedException
*/
public <T> T doAs(UserGroupInformation ugi, PrivilegedExceptionAction<T> pvea) throws
- IOException, InterruptedException;
+ IOException, InterruptedException;
/**
* Once a delegation token is stored in a file, the location is specified
@@ -188,13 +187,13 @@ public interface HadoopShims {
/**
- * Used by metastore server to creates UGI object for a remote user.
+ * Used to creates UGI object for a remote user.
* @param userName remote User Name
* @param groupNames group names associated with remote user name
* @return UGI created for the remote user.
*/
-
public UserGroupInformation createRemoteUser(String userName, List<String> groupNames);
+
/**
* Get the short name corresponding to the subject in the passed UGI
*
@@ -240,7 +239,7 @@ public interface HadoopShims {
* @throws IOException
*/
public void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService)
- throws IOException;
+ throws IOException;
/**
* Add given service to the string format token
@@ -250,7 +249,7 @@ public interface HadoopShims {
* @throws IOException
*/
public String addServiceToToken(String tokenStr, String tokenService)
- throws IOException;
+ throws IOException;
enum JobTrackerState { INITIALIZING, RUNNING };
@@ -331,7 +330,7 @@ public interface HadoopShims {
* @throws IOException
*/
public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration conf)
- throws IOException;
+ throws IOException;
/**
* Get the default block size for the path. FileSystem alone is not sufficient to
@@ -382,6 +381,7 @@ public interface HadoopShims {
public interface InputSplitShim extends InputSplit {
JobConf getJob();
+ @Override
long getLength();
/** Returns an array containing the startoffsets of the files in the split. */
@@ -406,14 +406,18 @@ public interface HadoopShims {
Path[] getPaths();
/** Returns all the Paths where this input-split resides. */
+ @Override
String[] getLocations() throws IOException;
void shrinkSplit(long length);
+ @Override
String toString();
+ @Override
void readFields(DataInput in) throws IOException;
+ @Override
void write(DataOutput out) throws IOException;
}
@@ -445,7 +449,7 @@ public interface HadoopShims {
* @throws IOException
*/
Iterator<FileStatus> listLocatedStatus(FileSystem fs, Path path,
- PathFilter filter) throws IOException;
+ PathFilter filter) throws IOException;
/**
* For file status returned by listLocatedStatus, convert them into a list
@@ -456,7 +460,7 @@ public interface HadoopShims {
* @throws IOException
*/
BlockLocation[] getLocations(FileSystem fs,
- FileStatus status) throws IOException;
+ FileStatus status) throws IOException;
public HCatHadoopShims getHCatShim();
public interface HCatHadoopShims {
@@ -468,10 +472,10 @@ public interface HadoopShims {
public TaskAttemptID createTaskAttemptID();
public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf,
- TaskAttemptID taskId);
+ TaskAttemptID taskId);
public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf,
- org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable);
+ org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable);
public JobContext createJobContext(Configuration conf, JobID jobId);
@@ -603,7 +607,7 @@ public interface HadoopShims {
}
public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec);
-
+
/**
* Get configuration from JobContext
*/
Modified: hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1578350&r1=1578349&r2=1578350&view=diff
==============================================================================
--- hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/branches/branch-0.13/shims/common/src/main/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Mon Mar 17 13:04:29 2014
@@ -16,39 +16,53 @@
* limitations under the License.
*/
- package org.apache.hadoop.hive.thrift;
+package org.apache.hadoop.hive.thrift;
- import java.io.IOException;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TProcessor;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
- /**
- * This class is only overridden by the secure hadoop shim. It allows
- * the Thrift SASL support to bridge to Hadoop's UserGroupInformation
- * & DelegationToken infrastructure.
- */
- public class HadoopThriftAuthBridge {
- public Client createClient() {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
- }
-
- public Client createClientWithConf(String authType) {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
- }
-
- public Server createServer(String keytabFile, String principalConf)
- throws TTransportException {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
- }
+/**
+ * This class is only overridden by the secure hadoop shim. It allows
+ * the Thrift SASL support to bridge to Hadoop's UserGroupInformation
+ * & DelegationToken infrastructure.
+ */
+public class HadoopThriftAuthBridge {
+ public Client createClient() {
+ throw new UnsupportedOperationException(
+ "The current version of Hadoop does not support Authentication");
+ }
+
+ public Client createClientWithConf(String authType) {
+ throw new UnsupportedOperationException(
+ "The current version of Hadoop does not support Authentication");
+ }
+
+ public UserGroupInformation getCurrentUGIWithConf(String authType)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "The current version of Hadoop does not support Authentication");
+ }
+
+
+ public String getServerPrincipal(String principalConfig, String host)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "The current version of Hadoop does not support Authentication");
+ }
+
+ public Server createServer(String keytabFile, String principalConf)
+ throws TTransportException {
+ throw new UnsupportedOperationException(
+ "The current version of Hadoop does not support Authentication");
+ }
/**
@@ -58,47 +72,47 @@ import org.apache.thrift.transport.TTran
* @param conf
* @return Hadoop SASL configuration
*/
- public Map<String, String> getHadoopSaslProperties(Configuration conf) {
- throw new UnsupportedOperationException(
- "The current version of Hadoop does not support Authentication");
- }
-
- public static abstract class Client {
- /**
- *
- * @param principalConfig In the case of Kerberos authentication this will
- * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
- * authentication this will be null
- * @param host The metastore server host name
- * @param methodStr "KERBEROS" or "DIGEST"
- * @param tokenStrForm This is url encoded string form of
- * org.apache.hadoop.security.token.
- * @param underlyingTransport the underlying transport
- * @return the transport
- * @throws IOException
- */
- public abstract TTransport createClientTransport(
- String principalConfig, String host,
- String methodStr, String tokenStrForm, TTransport underlyingTransport,
- Map<String, String> saslProps)
- throws IOException;
- }
-
- public static abstract class Server {
- public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
- public abstract TProcessor wrapProcessor(TProcessor processor);
- public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
- public abstract InetAddress getRemoteAddress();
- public abstract void startDelegationTokenSecretManager(Configuration conf,
- Object hmsHandler) throws IOException;
- public abstract String getDelegationToken(String owner, String renewer)
- throws IOException, InterruptedException;
- public abstract String getDelegationTokenWithService(String owner, String renewer, String service)
- throws IOException, InterruptedException;
- public abstract String getRemoteUser();
- public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
- public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
- public abstract String getUserFromToken(String tokenStr) throws IOException;
- }
- }
+ public Map<String, String> getHadoopSaslProperties(Configuration conf) {
+ throw new UnsupportedOperationException(
+ "The current version of Hadoop does not support Authentication");
+ }
+
+ public static abstract class Client {
+ /**
+ *
+ * @param principalConfig In the case of Kerberos authentication this will
+ * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
+ * authentication this will be null
+ * @param host The metastore server host name
+ * @param methodStr "KERBEROS" or "DIGEST"
+ * @param tokenStrForm This is url encoded string form of
+ * org.apache.hadoop.security.token.
+ * @param underlyingTransport the underlying transport
+ * @return the transport
+ * @throws IOException
+ */
+ public abstract TTransport createClientTransport(
+ String principalConfig, String host,
+ String methodStr, String tokenStrForm, TTransport underlyingTransport,
+ Map<String, String> saslProps)
+ throws IOException;
+ }
+
+ public static abstract class Server {
+ public abstract TTransportFactory createTransportFactory(Map<String, String> saslProps) throws TTransportException;
+ public abstract TProcessor wrapProcessor(TProcessor processor);
+ public abstract TProcessor wrapNonAssumingProcessor(TProcessor processor);
+ public abstract InetAddress getRemoteAddress();
+ public abstract void startDelegationTokenSecretManager(Configuration conf,
+ Object hmsHandler) throws IOException;
+ public abstract String getDelegationToken(String owner, String renewer)
+ throws IOException, InterruptedException;
+ public abstract String getDelegationTokenWithService(String owner, String renewer, String service)
+ throws IOException, InterruptedException;
+ public abstract String getRemoteUser();
+ public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
+ public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
+ public abstract String getUserFromToken(String tokenStr) throws IOException;
+ }
+}