You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2011/01/19 18:28:30 UTC
svn commit: r1060876 [4/4] - in /hive/trunk: ./ metastore/if/
metastore/src/gen/thrift/gen-cpp/
metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/
metastore/src/gen/thrift/gen-php/hive_metastore/
metastore/src/gen/thrift/gen-py...
Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java?rev=1060876&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java Wed Jan 19 17:28:28 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+
+/**
+ * A Hive specific delegation token secret manager.
+ * The secret manager is responsible for generating and accepting the password
+ * for each token.
+ */
+public class DelegationTokenSecretManager
+ extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+ /**
+ * Create a secret manager
+ * @param delegationKeyUpdateInterval the number of seconds for rolling new
+ * secret keys.
+ * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
+ * tokens
+ * @param delegationTokenRenewInterval how often the tokens must be renewed
+ * @param delegationTokenRemoverScanInterval how often the tokens are scanned
+ * for expired tokens
+ */
+ public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime,
+ long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ }
+
+ @Override
+ public DelegationTokenIdentifier createIdentifier() {
+ return new DelegationTokenIdentifier();
+ }
+
+ public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ cancelToken(t, user);
+ }
+
+ public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ return renewToken(t, user);
+ }
+
+ public synchronized String getDelegationToken(String renewer, String tokenSignature) throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Text owner = new Text(ugi.getUserName());
+ Text realUser = null;
+ if (ugi.getRealUser() != null) {
+ realUser = new Text(ugi.getRealUser().getUserName());
+ }
+ DelegationTokenIdentifier ident =
+ new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
+ Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>(
+ ident, this);
+ if(tokenSignature != null) {
+ t.setService(new Text(tokenSignature));
+ }
+ return t.encodeToUrlString();
+ }
+
+ public synchronized String getDelegationToken(String renewer) throws IOException {
+ return getDelegationToken(renewer, null);
+ }
+}
+
Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java?rev=1060876&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java Wed Jan 19 17:28:28 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+/**
+ * A delegation token that is specialized for Hive
+ */
+
+public class DelegationTokenSelector
+ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+
+ public DelegationTokenSelector() {
+ super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
+ }
+}
Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1060876&r1=1060875&r2=1060876&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Wed Jan 19 17:28:28 2011
@@ -15,21 +15,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.thrift;
- import java.io.IOException;
+import java.io.IOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
@@ -66,14 +79,27 @@ import org.apache.thrift.transport.TTran
* @param serverPrincipal The Kerberos principal of the target server.
* @param underlyingTransport The underlying transport mechanism, usually a TSocket.
*/
+
@Override
public TTransport createClientTransport(
String principalConfig, String host,
- String methodStr, TTransport underlyingTransport)
+ String methodStr, String tokenStrForm, TTransport underlyingTransport)
throws IOException {
AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+ TTransport saslTransport = null;
switch (method) {
+ case DIGEST:
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(t),
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
case KERBEROS:
String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
@@ -83,7 +109,7 @@ import org.apache.thrift.transport.TTran
+ serverPrincipal);
}
try {
- TTransport saslTransport = new TSaslClientTransport(
+ saslTransport = new TSaslClientTransport(
method.getMechanismName(),
null,
names[0], names[1],
@@ -95,16 +121,219 @@ import org.apache.thrift.transport.TTran
}
default:
- throw new IOException("Unsupported authentication method: " +method);
+ throw new IOException("Unsupported authentication method: " + method);
+ }
+ }
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ private final String userName;
+ private final char[] userPassword;
+
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = encodeIdentifier(token.getIdentifier());
+ this.userPassword = encodePassword(token.getPassword());
+ }
+
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting username: " + userName);
+ }
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting userPassword");
+ }
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ }
+ rc.setText(rc.getDefaultText());
+ }
+ }
+
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ }
+ /**
+ * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+ * inside open(). So, we need to assume the correct UGI when the transport is opened
+ * so that the SASL mechanisms have access to the right principal. This transport
+ * wraps the Sasl transports to set up the right UGI context for open().
+ *
+ * This is used on the client side, where the API explicitly opens a transport to
+ * the server.
+ */
+ private static class TUGIAssumingTransport extends TFilterTransport {
+ private final UserGroupInformation ugi;
+
+ public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+ super(wrapped);
+ this.ugi = ugi;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ wrapped.open();
+ } catch (TTransportException tte) {
+ // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+ // and unwraps this for us out of the doAs block. We then unwrap one
+ // more time in our catch clause to get back the TTE. (ugh)
+ throw new RuntimeException(tte);
+ }
+ return null;
+ }
+ });
+ } catch (IOException ioe) {
+ assert false : "Never thrown!";
+ throw new RuntimeException("Received an ioe we never threw!", ioe);
+ } catch (InterruptedException ie) {
+ assert false : "We never expect to see an InterruptedException thrown in this block";
+ throw new RuntimeException("Received an ie we never threw!", ie);
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TTransportException) {
+ throw (TTransportException)rte.getCause();
+ } else {
+ throw rte;
+ }
+ }
+ }
+ }
+ /**
+ * Transport that simply wraps another transport.
+ * This is the equivalent of FilterInputStream for Thrift transports.
+ */
+ private static class TFilterTransport extends TTransport {
+ protected final TTransport wrapped;
+
+ public TFilterTransport(TTransport wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ wrapped.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped.isOpen();
+ }
+
+ @Override
+ public boolean peek() {
+ return wrapped.peek();
+ }
+
+ @Override
+ public void close() {
+ wrapped.close();
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.read(buf, off, len);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int off, int len) throws TTransportException {
+ return wrapped.readAll(buf, off, len);
+ }
+
+ @Override
+ public void write(byte[] buf) throws TTransportException {
+ wrapped.write(buf);
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ wrapped.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ wrapped.flush();
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return wrapped.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return wrapped.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return wrapped.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ wrapped.consumeBuffer(len);
}
}
}
public static class Server extends HadoopThriftAuthBridge.Server {
- private final UserGroupInformation realUgi;
+ final UserGroupInformation realUgi;
+ DelegationTokenSecretManager secretManager;
+ private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+ //Delegation token related keys
+ public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+ "hive.cluster.delegation.key.update-interval";
+ public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+ "hive.cluster.delegation.token.renew-interval";
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+ "hive.cluster.delegation.token.max-lifetime";
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+ 7*24*60*60*1000; // 7 days
+ public Server() throws TTransportException {
+ try {
+ realUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
/**
- * TODO: javadoc
+ * Create a server with a kerberos keytab/principal.
*/
private Server(String keytabFile, String principalConf)
throws TTransportException {
@@ -143,7 +372,7 @@ import org.apache.thrift.transport.TTran
String kerberosName = realUgi.getUserName();
final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
if (names.length != 3) {
- throw new TTransportException("Kerberos principal should have 3 parts: " +kerberosName);
+ throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
}
TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
@@ -152,6 +381,9 @@ import org.apache.thrift.transport.TTran
names[0], names[1], // two parts of kerberos principal
SaslRpcServer.SASL_PROPS,
new SaslRpcServer.SaslGssCallbackHandler());
+ transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(secretManager));
return new TUGIAssumingTransportFactory(transFactory, realUgi);
}
@@ -163,7 +395,123 @@ import org.apache.thrift.transport.TTran
*/
@Override
public TProcessor wrapProcessor(TProcessor processor) {
- return new TUGIAssumingProcessor(processor);
+ return new TUGIAssumingProcessor(processor, secretManager);
+ }
+
+ @Override
+ public void startDelegationTokenSecretManager(Configuration conf)
+ throws IOException{
+ long secretKeyInterval =
+ conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+ DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+ long tokenMaxLifetime =
+ conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+ DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+ long tokenRenewInterval =
+ conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+ secretManager =
+ new DelegationTokenSecretManager(secretKeyInterval,
+ tokenMaxLifetime,
+ tokenRenewInterval,
+ DELEGATION_TOKEN_GC_INTERVAL);
+ secretManager.startThreads();
+ }
+
+ @Override
+ public String getDelegationToken(String renewer) throws IOException {
+ return secretManager.getDelegationToken(renewer);
+ }
+
+ @Override
+ public long renewDelegationToken(String tokenStrForm) throws IOException {
+ return secretManager.renewDelegationToken(tokenStrForm);
+ }
+
+ @Override
+ public String getDelegationToken(String renewer, String token_signature)
+ throws IOException {
+ return secretManager.getDelegationToken(renewer, token_signature);
+ }
+
+ @Override
+ public void cancelDelegationToken(String tokenStrForm) throws IOException {
+ secretManager.cancelDelegationToken(tokenStrForm);
+ }
+
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ // This code is pretty much completely based on Hadoop's
+ // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+ // use that Hadoop class as-is was because it needs a Server.Connection object
+ // which is relevant in hadoop rpc but not here in the metastore - so the
+ // code below does not deal with the Connection Server.object.
+ static class SaslDigestCallbackHandler implements CallbackHandler {
+ private final DelegationTokenSecretManager secretManager;
+
+ public SaslDigestCallbackHandler(
+ DelegationTokenSecretManager secretManager) {
+ this.secretManager = secretManager;
+ }
+
+ private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+ return encodePassword(secretManager.retrievePassword(tokenid));
+ }
+
+ private char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws InvalidToken,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+ getIdentifier(nc.getDefaultName(), secretManager);
+ char[] password = getPassword(tokenIdentifier);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+ + "for client: " + tokenIdentifier.getUser());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled()) {
+ String username =
+ SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+ LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ }
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
}
/**
@@ -175,22 +523,35 @@ import org.apache.thrift.transport.TTran
*/
private class TUGIAssumingProcessor implements TProcessor {
final TProcessor wrapped;
-
- TUGIAssumingProcessor(TProcessor wrapped) {
+ DelegationTokenSecretManager secretManager;
+ TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager) {
this.wrapped = wrapped;
+ this.secretManager = secretManager;
}
public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
TTransport trans = inProt.getTransport();
if (!(trans instanceof TSaslServerTransport)) {
- throw new TException("Unexpected non-SASL transport " +trans.getClass());
+ throw new TException("Unexpected non-SASL transport " + trans.getClass());
}
TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
- String authId = saslTrans.getSaslServer().getAuthorizationID();
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ LOG.debug("AUTH ID ======>" + authId);
+ String endUser = authId;
+ if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+ try {
+ TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+ secretManager);
+ endUser = tokenId.getUser().getUserName();
+ } catch (InvalidToken e) {
+ throw new TException(e.getMessage());
+ }
+ }
try {
UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(
- authId, UserGroupInformation.getLoginUser());
+ endUser, UserGroupInformation.getLoginUser());
return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() {
try {
@@ -213,160 +574,33 @@ import org.apache.thrift.transport.TTran
}
}
- }
-
- /**
- * A TransportFactory that wraps another one, but assumes a specified UGI
- * before calling through.
- *
- * This is used on the server side to assume the server's Principal when accepting
- * clients.
- */
- private static class TUGIAssumingTransportFactory extends TTransportFactory {
- private final UserGroupInformation ugi;
- private final TTransportFactory wrapped;
-
- public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
- assert wrapped != null;
- assert ugi != null;
-
- this.wrapped = wrapped;
- this.ugi = ugi;
- }
-
- @Override
- public TTransport getTransport(final TTransport trans) {
- return ugi.doAs(new PrivilegedAction<TTransport>() {
- public TTransport run() {
- return wrapped.getTransport(trans);
- }
- });
- }
- }
+ /**
+ * A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final UserGroupInformation ugi;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+ assert wrapped != null;
+ assert ugi != null;
- /**
- * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
- * inside open(). So, we need to assume the correct UGI when the transport is opened
- * so that the SASL mechanisms have access to the right principal. This transport
- * wraps the Sasl transports to set up the right UGI context for open().
- *
- * This is used on the client side, where the API explicitly opens a transport to
- * the server.
- */
- private static class TUGIAssumingTransport extends TFilterTransport {
- private final UserGroupInformation ugi;
-
- public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
- super(wrapped);
- this.ugi = ugi;
- }
+ this.wrapped = wrapped;
+ this.ugi = ugi;
+ }
- @Override
- public void open() throws TTransportException {
- try {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() {
- try {
- wrapped.open();
- } catch (TTransportException tte) {
- // Wrap the transport exception in an RTE, since UGI.doAs() then goes
- // and unwraps this for us out of the doAs block. We then unwrap one
- // more time in our catch clause to get back the TTE. (ugh)
- throw new RuntimeException(tte);
- }
- return null;
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ return ugi.doAs(new PrivilegedAction<TTransport>() {
+ public TTransport run() {
+ return wrapped.getTransport(trans);
}
});
- } catch (IOException ioe) {
- assert false : "Never thrown!";
- throw new RuntimeException("Received an ioe we never threw!", ioe);
- } catch (InterruptedException ie) {
- assert false : "We never expect to see an InterruptedException thrown in this block";
- throw new RuntimeException("Received an ie we never threw!", ie);
- } catch (RuntimeException rte) {
- if (rte.getCause() instanceof TTransportException) {
- throw (TTransportException)rte.getCause();
- } else {
- throw rte;
- }
}
}
}
-
- /**
- * Transport that simply wraps another transport.
- * This is the equivalent of FilterInputStream for Thrift transports.
- */
- private static class TFilterTransport extends TTransport {
- protected final TTransport wrapped;
-
- public TFilterTransport(TTransport wrapped) {
- this.wrapped = wrapped;
- }
-
- @Override
- public void open() throws TTransportException {
- wrapped.open();
- }
-
- @Override
- public boolean isOpen() {
- return wrapped.isOpen();
- }
-
- @Override
- public boolean peek() {
- return wrapped.peek();
- }
-
- @Override
- public void close() {
- wrapped.close();
- }
-
- @Override
- public int read(byte[] buf, int off, int len) throws TTransportException {
- return wrapped.read(buf, off, len);
- }
-
- @Override
- public int readAll(byte[] buf, int off, int len) throws TTransportException {
- return wrapped.readAll(buf, off, len);
- }
-
- @Override
- public void write(byte[] buf) throws TTransportException {
- wrapped.write(buf);
- }
-
- @Override
- public void write(byte[] buf, int off, int len) throws TTransportException {
- wrapped.write(buf, off, len);
- }
-
- @Override
- public void flush() throws TTransportException {
- wrapped.flush();
- }
-
- @Override
- public byte[] getBuffer() {
- return wrapped.getBuffer();
- }
-
- @Override
- public int getBufferPosition() {
- return wrapped.getBufferPosition();
- }
-
- @Override
- public int getBytesRemainingInBuffer() {
- return wrapped.getBytesRemainingInBuffer();
- }
-
- @Override
- public void consumeBuffer(int len) {
- wrapped.consumeBuffer(len);
- }
- }
}
Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1060876&r1=1060875&r2=1060876&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Wed Jan 19 17:28:28 2011
@@ -21,6 +21,10 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -35,10 +39,6 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.security.UserGroupInformation;
-import javax.security.auth.login.LoginException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* In order to be compatible with multiple versions of Hadoop, all parts
@@ -158,6 +158,24 @@ public interface HadoopShims {
*/
public UserGroupInformation getUGIForConf(Configuration conf) throws LoginException, IOException;
+
+ /**
+ * Get the string form of the token given a token signature.
+ * The signature is used as the value of the "service" field in the token for lookup.
+ * Ref: AbstractDelegationTokenSelector in Hadoop. If there exists such a token
+ * in the token cache (credential store) of the job, the lookup returns that.
+ * This is relevant only when running against a "secure" hadoop release
+ * The method gets hold of the tokens if they are set up by hadoop - this should
+ * happen on the map/reduce tasks if the client added the tokens into hadoop's
+ * credential store in the front end during job submission. The method will
+ * select the hive delegation token among the set of tokens and return the string
+ * form of it
+ * @param tokenSignature
+ * @return the string form of the token found
+ * @throws IOException
+ */
+ String getTokenStrForm(String tokenSignature) throws IOException;
+
/**
* InputSplitShim.
*
Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java?rev=1060876&r1=1060875&r2=1060876&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge.java Wed Jan 19 17:28:28 2011
@@ -20,15 +20,15 @@
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TProcessor;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-
/**
* This class is only overridden by the secure hadoop shim. It allows
* the Thrift SASL support to bridge to Hadoop's UserGroupInformation
- * infrastructure.
+ * & DelegationToken infrastructure.
*/
public class HadoopThriftAuthBridge {
public Client createClient() {
@@ -44,15 +44,33 @@ import org.apache.thrift.transport.TTran
public static abstract class Client {
+ /**
+ *
+ * @param principalConfig In the case of Kerberos authentication this will
+ * be the kerberos principal name, for DIGEST-MD5 (delegation token) based
+ * authentication this will be null
+ * @param host The metastore server host name
+ * @param methodStr "KERBEROS" or "DIGEST"
+ * @param tokenStrForm This is url encoded string form of
+ * org.apache.hadoop.security.token.
+ * @param underlyingTransport the underlying transport
+ * @return the transport
+ * @throws IOException
+ */
public abstract TTransport createClientTransport(
String principalConfig, String host,
- String methodStr, TTransport underlyingTransport)
+ String methodStr,String tokenStrForm, TTransport underlyingTransport)
throws IOException;
}
public static abstract class Server {
public abstract TTransportFactory createTransportFactory() throws TTransportException;
public abstract TProcessor wrapProcessor(TProcessor processor);
+ public abstract void startDelegationTokenSecretManager(Configuration conf) throws IOException;
+ public abstract String getDelegationToken(String renewer) throws IOException;
+ public abstract long renewDelegationToken(String tokenStrForm) throws IOException;
+ public abstract String getDelegationToken(String renewer, String token_signature) throws IOException;
+ public abstract void cancelDelegationToken(String tokenStrForm) throws IOException;
}
}
Added: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1060876&view=auto
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (added)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Wed Jan 19 17:28:28 2011
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+public class TestHadoop20SAuthBridge extends TestCase {
+
+ private static class MyHadoopThriftAuthBridge20S extends HadoopThriftAuthBridge20S {
+ @Override
+ public Server createServer(String keytabFile, String principalConf)
+ throws TTransportException {
+ //Create a Server that doesn't interpret any Kerberos stuff
+ return new Server();
+ }
+
+ static class Server extends HadoopThriftAuthBridge20S.Server {
+ public Server() throws TTransportException {
+ super();
+ }
+ @Override
+ public TTransportFactory createTransportFactory()
+ throws TTransportException {
+ TSaslServerTransport.Factory transFactory =
+ new TSaslServerTransport.Factory();
+ transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS,
+ new SaslDigestCallbackHandler(secretManager));
+
+ return new TUGIAssumingTransportFactory(transFactory, realUgi);
+ }
+ }
+ }
+ private static final int port = 10000;
+
+ private final HiveConf conf;
+
+ public TestHadoop20SAuthBridge(String name) {
+ super(name);
+ System.setProperty(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname,
+ "true");
+ System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname,
+ "thrift://localhost:" + port);
+ System.setProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(
+ System.getProperty("test.build.data", "/tmp")).toString());
+ conf = new HiveConf(TestHadoop20SAuthBridge.class);
+ conf.setBoolean("hive.metastore.local", false);
+ }
+
+ public void testSaslWithHiveMetaStore() throws Exception {
+
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ HiveMetaStore.startMetaStore(port,new MyHadoopThriftAuthBridge20S());
+ } catch (Throwable e) {
+ System.exit(1);
+ }
+ }
+ });
+ thread.setDaemon(true);
+ thread.start();
+ loopUntilHMSReady();
+ UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser();
+ obtainTokenAndAddIntoUGI(clientUgi, null);
+ obtainTokenAndAddIntoUGI(clientUgi, "tokenForFooTablePartition");
+ }
+
+ private void obtainTokenAndAddIntoUGI(UserGroupInformation clientUgi,
+ String tokenSig) throws Exception {
+ //obtain a token by directly invoking the metastore operation(without going
+ //through the thrift interface). Obtaining a token makes the secret manager
+ //aware of the user and that it gave the token to the user
+ String tokenStrForm;
+ if (tokenSig == null) {
+ tokenStrForm =
+ HiveMetaStore.getDelegationToken(clientUgi.getShortUserName());
+ } else {
+ tokenStrForm =
+ HiveMetaStore.getDelegationToken(clientUgi.getShortUserName(),
+ tokenSig);
+ conf.set("hive.metastore.token.signature", tokenSig);
+ }
+
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ //add the token to the clientUgi for securely talking to the metastore
+ clientUgi.addToken(t);
+ //Create the metastore client as the clientUgi. Doing so this
+ //way will give the client access to the token that was added earlier
+ //in the clientUgi
+ HiveMetaStoreClient hiveClient =
+ clientUgi.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+ public HiveMetaStoreClient run() throws Exception {
+ HiveMetaStoreClient hiveClient =
+ new HiveMetaStoreClient(conf);
+ return hiveClient;
+ }
+ });
+
+ assertTrue("Couldn't connect to metastore", hiveClient != null);
+
+ //try out some metastore operations
+ createDBAndVerifyExistence(hiveClient);
+ hiveClient.close();
+
+ //Now cancel the delegation token
+ HiveMetaStore.cancelDelegationToken(tokenStrForm);
+
+ //now metastore connection should fail
+ hiveClient =
+ clientUgi.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+ public HiveMetaStoreClient run() {
+ try {
+ HiveMetaStoreClient hiveClient =
+ new HiveMetaStoreClient(conf);
+ return hiveClient;
+ } catch (MetaException e) {
+ return null;
+ }
+ }
+ });
+ assertTrue("Expected metastore operations to fail", hiveClient == null);
+ }
+
+ /**
+ * A simple connect test to make sure that the metastore is up
+ * @throws Exception
+ */
+ private void loopUntilHMSReady() throws Exception {
+ int retries = 0;
+ Exception exc = null;
+ while (true) {
+ try {
+ Socket socket = new Socket();
+ socket.connect(new InetSocketAddress(port), 5000);
+ socket.close();
+ return;
+ } catch (Exception e) {
+ if (retries++ > 6) { //give up
+ exc = e;
+ break;
+ }
+ Thread.sleep(10000);
+ }
+ }
+ throw exc;
+ }
+
+ private void createDBAndVerifyExistence(HiveMetaStoreClient client)
+ throws Exception {
+ String dbName = "simpdb";
+ Database db = new Database();
+ db.setName(dbName);
+ client.createDatabase(db);
+ Database db1 = client.getDatabase(dbName);
+ client.dropDatabase(dbName);
+ assertTrue("Databases do not match", db1.getName().equals(db.getName()));
+ }
+}