You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2012/01/10 12:15:03 UTC
svn commit: r1229510 [2/2] - in /hive/trunk: ./ shims/
shims/src/0.20S/java/org/apache/hadoop/hive/shims/
shims/src/0.20S/java/org/apache/hadoop/hive/thrift/
shims/src/0.20S/java/org/apache/hadoop/hive/thrift/client/
shims/src/0.20S/java/org/apache/had...
Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+
+ /**
+ * Functions that bridge Thrift's SASL transports to Hadoop's
+ * SASL callback handlers and authentication classes.
+ */
+ public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge {
+ static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
+
+ @Override
+ public Client createClient() {
+ return new Client();
+ }
+
+ @Override
+ public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+ return new Server(keytabFile, principalConf);
+ }
+
+ public static class Client extends HadoopThriftAuthBridge.Client {
+ /**
+ * Create a client-side SASL transport that wraps an underlying transport.
+ *
+ * @param method The authentication method to use. Currently only KERBEROS is
+ * supported.
+ * @param serverPrincipal The Kerberos principal of the target server.
+ * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+ */
+
+ @Override
+ public TTransport createClientTransport(
+ String principalConfig, String host,
+ String methodStr, String tokenStrForm, TTransport underlyingTransport)
+ throws IOException {
+ AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+ TTransport saslTransport = null;
+ switch (method) {
+ case DIGEST:
+ Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(t),
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+ case KERBEROS:
+ String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+ String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+ if (names.length != 3) {
+ throw new IOException(
+ "Kerberos principal name does NOT have the expected hostname part: "
+ + serverPrincipal);
+ }
+ try {
+ saslTransport = new TSaslClientTransport(
+ method.getMechanismName(),
+ null,
+ names[0], names[1],
+ SaslRpcServer.SASL_PROPS, null,
+ underlyingTransport);
+ return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+ } catch (SaslException se) {
+ throw new IOException("Could not instantiate SASL transport", se);
+ }
+
+ default:
+ throw new IOException("Unsupported authentication method: " + method);
+ }
+ }
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+ private final String userName;
+ private final char[] userPassword;
+
+ public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+ this.userName = encodeIdentifier(token.getIdentifier());
+ this.userPassword = encodePassword(token.getPassword());
+ }
+
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting username: " + userName);
+ }
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting userPassword");
+ }
+ pc.setPassword(userPassword);
+ }
+ if (rc != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client callback: setting realm: "
+ + rc.getDefaultText());
+ }
+ rc.setText(rc.getDefaultText());
+ }
+ }
+
+ static String encodeIdentifier(byte[] identifier) {
+ return new String(Base64.encodeBase64(identifier));
+ }
+
+ static char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ }
+ }
+
+ public static class Server extends HadoopThriftAuthBridge.Server {
+ final UserGroupInformation realUgi;
+ DelegationTokenSecretManager secretManager;
+ private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+ //Delegation token related keys
+ public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+ "hive.cluster.delegation.key.update-interval";
+ public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+ "hive.cluster.delegation.token.renew-interval";
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+ 24*60*60*1000; // 1 day
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+ "hive.cluster.delegation.token.max-lifetime";
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+ 7*24*60*60*1000; // 7 days
+ public static final String DELEGATION_TOKEN_STORE_CLS =
+ "hive.cluster.delegation.token.store.class";
+ public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+ "hive.cluster.delegation.token.store.zookeeper.connectString";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE =
+ "hive.cluster.delegation.token.store.zookeeper.rootNode";
+ public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT =
+ "/hive/cluster/delegation";
+
+ public Server() throws TTransportException {
+ try {
+ realUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+ /**
+ * Create a server with a kerberos keytab/principal.
+ */
+ private Server(String keytabFile, String principalConf)
+ throws TTransportException {
+ if (keytabFile == null || keytabFile.isEmpty()) {
+ throw new TTransportException("No keytab specified");
+ }
+ if (principalConf == null || principalConf.isEmpty()) {
+ throw new TTransportException("No principal specified");
+ }
+
+ // Login from the keytab
+ String kerberosName;
+ try {
+ kerberosName =
+ SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+ UserGroupInformation.loginUserFromKeytab(
+ kerberosName, keytabFile);
+ realUgi = UserGroupInformation.getLoginUser();
+ assert realUgi.isFromKeytab();
+ } catch (IOException ioe) {
+ throw new TTransportException(ioe);
+ }
+ }
+
+ /**
+ * Create a TTransportFactory that, upon connection of a client socket,
+ * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+ * can be passed as both the input and output transport factory when
+ * instantiating a TThreadPoolServer, for example.
+ *
+ */
+ @Override
+ public TTransportFactory createTransportFactory() throws TTransportException
+ {
+ // Parse out the kerberos principal, host, realm.
+ String kerberosName = realUgi.getUserName();
+ final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+ if (names.length != 3) {
+ throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+ }
+
+ TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+ transFactory.addServerDefinition(
+ AuthMethod.KERBEROS.getMechanismName(),
+ names[0], names[1], // two parts of kerberos principal
+ SaslRpcServer.SASL_PROPS,
+ new SaslRpcServer.SaslGssCallbackHandler());
+ transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+ null, SaslRpcServer.SASL_DEFAULT_REALM,
+ SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(secretManager));
+
+ return new TUGIAssumingTransportFactory(transFactory, realUgi);
+ }
+
+ /**
+ * Wrap a TProcessor in such a way that, before processing any RPC, it
+ * assumes the UserGroupInformation of the user authenticated by
+ * the SASL transport.
+ */
+ @Override
+ public TProcessor wrapProcessor(TProcessor processor) {
+ return new TUGIAssumingProcessor(processor, secretManager);
+ }
+
+ protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf)
+ throws IOException {
+ String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+ if (StringUtils.isBlank(tokenStoreClassName)) {
+ return new MemoryTokenStore();
+ }
+ try {
+ Class<? extends TokenStoreDelegationTokenSecretManager.TokenStore> storeClass = Class
+ .forName(tokenStoreClassName).asSubclass(
+ TokenStoreDelegationTokenSecretManager.TokenStore.class);
+ return ReflectionUtils.newInstance(storeClass, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
+ e);
+ }
+ }
+
+ @Override
+ public void startDelegationTokenSecretManager(Configuration conf)
+ throws IOException{
+ long secretKeyInterval =
+ conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+ DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+ long tokenMaxLifetime =
+ conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+ DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+ long tokenRenewInterval =
+ conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+
+ secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+ tokenMaxLifetime,
+ tokenRenewInterval,
+ DELEGATION_TOKEN_GC_INTERVAL, getTokenStore(conf));
+ secretManager.startThreads();
+ }
+
+ @Override
+ public String getDelegationToken(final String owner, final String renewer)
+ throws IOException, InterruptedException {
+ if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+ throw new AuthorizationException(
+ "Delegation Token can be issued only with kerberos authentication");
+ }
+ //if the user asking the token is same as the 'owner' then don't do
+ //any proxy authorization checks. For cases like oozie, where it gets
+ //a delegation token for another user, we need to make sure oozie is
+ //authorized to get a delegation token.
+ //Do all checks on short names
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+ UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+ if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+ //in the case of proxy users, the getCurrentUser will return the
+ //real user (for e.g. oozie) due to the doAs that happened just before the
+ //server started executing the method getDelegationToken in the MetaStore
+ ownerUgi = UserGroupInformation.createProxyUser(owner,
+ UserGroupInformation.getCurrentUser());
+ InetAddress remoteAddr = getRemoteAddress();
+ ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
+ }
+ return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+ public String run() throws IOException {
+ return secretManager.getDelegationToken(renewer);
+ }
+ });
+ }
+
+ @Override
+ public long renewDelegationToken(String tokenStrForm) throws IOException {
+ if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+ throw new AuthorizationException(
+ "Delegation Token can be issued only with kerberos authentication");
+ }
+ return secretManager.renewDelegationToken(tokenStrForm);
+ }
+
+ @Override
+ public void cancelDelegationToken(String tokenStrForm) throws IOException {
+ secretManager.cancelDelegationToken(tokenStrForm);
+ }
+
+ final static ThreadLocal<InetAddress> remoteAddress =
+ new ThreadLocal<InetAddress>() {
+ @Override
+ protected synchronized InetAddress initialValue() {
+ return null;
+ }
+ };
+
+ @Override
+ public InetAddress getRemoteAddress() {
+ return remoteAddress.get();
+ }
+
+ final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+ new ThreadLocal<AuthenticationMethod>() {
+ @Override
+ protected synchronized AuthenticationMethod initialValue() {
+ return AuthenticationMethod.TOKEN;
+ }
+ };
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ // This code is pretty much completely based on Hadoop's
+ // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+ // use that Hadoop class as-is was because it needs a Server.Connection object
+ // which is relevant in hadoop rpc but not here in the metastore - so the
+ // code below does not deal with the Connection Server.object.
+ static class SaslDigestCallbackHandler implements CallbackHandler {
+ private final DelegationTokenSecretManager secretManager;
+
+ public SaslDigestCallbackHandler(
+ DelegationTokenSecretManager secretManager) {
+ this.secretManager = secretManager;
+ }
+
+ private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+ return encodePassword(secretManager.retrievePassword(tokenid));
+ }
+
+ private char[] encodePassword(byte[] password) {
+ return new String(Base64.encodeBase64(password)).toCharArray();
+ }
+ /** {@inheritDoc} */
+ @Override
+ public void handle(Callback[] callbacks) throws InvalidToken,
+ UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ continue; // realm is ignored
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL DIGEST-MD5 Callback");
+ }
+ }
+ if (pc != null) {
+ DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+ getIdentifier(nc.getDefaultName(), secretManager);
+ char[] password = getPassword(tokenIdentifier);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+ + "for client: " + tokenIdentifier.getUser());
+ }
+ pc.setPassword(password);
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (authid.equals(authzid)) {
+ ac.setAuthorized(true);
+ } else {
+ ac.setAuthorized(false);
+ }
+ if (ac.isAuthorized()) {
+ if (LOG.isDebugEnabled()) {
+ String username =
+ SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+ LOG.debug("SASL server DIGEST-MD5 callback: setting "
+ + "canonicalized client ID: " + username);
+ }
+ ac.setAuthorizedID(authzid);
+ }
+ }
+ }
+ }
+
+ /**
+ * Processor that pulls the SaslServer object out of the transport, and
+ * assumes the remote user's UGI before calling through to the original
+ * processor.
+ *
+ * This is used on the server side to set the UGI for each specific call.
+ */
+ private class TUGIAssumingProcessor implements TProcessor {
+ final TProcessor wrapped;
+ DelegationTokenSecretManager secretManager;
+ TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager) {
+ this.wrapped = wrapped;
+ this.secretManager = secretManager;
+ }
+
+ public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+ TTransport trans = inProt.getTransport();
+ if (!(trans instanceof TSaslServerTransport)) {
+ throw new TException("Unexpected non-SASL transport " + trans.getClass());
+ }
+ TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+ SaslServer saslServer = saslTrans.getSaslServer();
+ String authId = saslServer.getAuthorizationID();
+ authenticationMethod.set(AuthenticationMethod.KERBEROS);
+ LOG.debug("AUTH ID ======>" + authId);
+ String endUser = authId;
+
+ if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+ try {
+ TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+ secretManager);
+ endUser = tokenId.getUser().getUserName();
+ authenticationMethod.set(AuthenticationMethod.TOKEN);
+ } catch (InvalidToken e) {
+ throw new TException(e.getMessage());
+ }
+ }
+ Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+ remoteAddress.set(socket.getInetAddress());
+ try {
+ UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(
+ endUser, UserGroupInformation.getLoginUser());
+ return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ public Boolean run() {
+ try {
+ return wrapped.process(inProt, outProt);
+ } catch (TException te) {
+ throw new RuntimeException(te);
+ }
+ }
+ });
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TException) {
+ throw (TException)rte.getCause();
+ }
+ throw rte;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie); // unexpected!
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe); // unexpected!
+ }
+ }
+ }
+
+ /**
+ * A TransportFactory that wraps another one, but assumes a specified UGI
+ * before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting
+ * clients.
+ */
+ static class TUGIAssumingTransportFactory extends TTransportFactory {
+ private final UserGroupInformation ugi;
+ private final TTransportFactory wrapped;
+
+ public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+ assert wrapped != null;
+ assert ugi != null;
+
+ this.wrapped = wrapped;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public TTransport getTransport(final TTransport trans) {
+ return ugi.doAs(new PrivilegedAction<TTransport>() {
+ public TTransport run() {
+ return wrapped.getTransport(trans);
+ }
+ });
+ }
+ }
+ }
+ }
Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+ private final java.util.concurrent.ConcurrentHashMap<Integer, String> masterKeys
+ = new java.util.concurrent.ConcurrentHashMap<Integer, String>();
+
+ private final java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+ = new java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+
+ private final AtomicInteger masterKeySeq = new AtomicInteger();
+
+ @Override
+ public void setConf(Configuration conf) {
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public int addMasterKey(String s) {
+ int keySeq = masterKeySeq.getAndIncrement();
+ masterKeys.putIfAbsent(keySeq, s);
+ return keySeq;
+ }
+
+ @Override
+ public void updateMasterKey(int keySeq, String s) {
+ masterKeys.put(keySeq, s);
+ }
+
+ @Override
+ public boolean removeMasterKey(int keySeq) {
+ return masterKeys.remove(keySeq) != null;
+ }
+
+ @Override
+ public String[] getMasterKeys() {
+ return masterKeys.values().toArray(new String[0]);
+ }
+
+ @Override
+ public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) {
+ DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+ return (tokenInfo == null);
+ }
+
+ @Override
+ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+ DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+ return tokenInfo != null;
+ }
+
+ @Override
+ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ return tokens.get(tokenIdentifier);
+ }
+
+ @Override
+ public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+ List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+ tokens.size());
+ for (DelegationTokenIdentifier id : tokens.keySet()) {
+ result.add(id);
+ }
+ return result;
+ }
+
+}
Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory
+ * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.).
+ * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not
+ * cached in memory. This avoids complexities related to token expiration. The security token is
+ * needed only at the time the transport is opened (as opposed to per interface operation). The
+ * assumption therefore is low cost of interprocess token retrieval (for random read efficient store
+ * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches.
+ * The wrapper incorporates the token store abstraction within the limitations of current
+ * Hive/Hadoop dependency (.20S) with minimum code duplication.
+ * Eventually this should be supported by Hadoop security directly.
+ */
+public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
+
+ /**
+ * Exception for internal token store errors that typically cannot be handled by the caller.
+ */
+ public static class TokenStoreError extends RuntimeException {
+ private static final long serialVersionUID = -8693819817623074083L;
+
+ public TokenStoreError(Throwable cause) {
+ super(cause);
+ }
+
+ public TokenStoreError(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Interface for pluggable token store that can be implemented as shared store with external
+ * storage (for example with ZooKeeper for HA).
+ * Internal, store specific errors are translated into {@link TokenStoreError}.
+ */
+ public static interface TokenStore extends Configurable {
+ /**
+ * Add new master key. The token store assigns and returns the sequence number.
+ * Caller needs to use the identifier to update the key (since it is embedded in the key).
+ *
+ * @param s
+ * @return sequence number for new key
+ */
+ int addMasterKey(String s) throws TokenStoreError;
+
+ void updateMasterKey(int keySeq, String s) throws TokenStoreError;
+
+ /**
+ * Remove key for given id.
+ * @param keySeq
+ * @return false if key no longer present, true otherwise.
+ */
+ boolean removeMasterKey(int keySeq);
+
+ String[] getMasterKeys() throws TokenStoreError;
+
+ /**
+ * Add token. If identifier is already present, token won't be added.
+ * @param tokenIdentifier
+ * @param token
+ * @return true if token was added, false for existing identifier
+ */
+ boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) throws TokenStoreError;
+
+ /**
+ * Get token. Returns null if the token does not exist.
+ * @param tokenIdentifier
+ * @return
+ */
+ DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+ throws TokenStoreError;
+
+ /**
+ * Remove token. Ignores token does not exist.
+ * @param tokenIdentifier
+ */
+ boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError;
+
+ /**
+ * List of all token identifiers in the store. This is used to remove expired tokens
+ * and a potential scalability improvement would be to partition by master key id
+ * @return
+ */
+ List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
+
+ }
+
+ final private long keyUpdateInterval;
+ final private long tokenRemoverScanInterval;
+ private Thread tokenRemoverThread;
+
+ final private TokenStore tokenStore;
+
+ public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval, TokenStore sharedStore) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
+ delegationTokenRemoverScanInterval);
+ this.keyUpdateInterval = delegationKeyUpdateInterval;
+ this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
+
+ this.tokenStore = sharedStore;
+ }
+
+ protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ // turn bytes back into identifier for cache lookup
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ DelegationTokenIdentifier id = createIdentifier();
+ id.readFields(in);
+ return id;
+ }
+
+ protected Map<Integer, DelegationKey> reloadKeys() {
+ // read keys from token store
+ String[] allKeys = tokenStore.getMasterKeys();
+ Map<Integer, DelegationKey> keys
+ = new java.util.HashMap<Integer, DelegationKey>(allKeys.length);
+ for (String keyStr : allKeys) {
+ DelegationKey key = new DelegationKey();
+ try {
+ decodeWritable(key, keyStr);
+ keys.put(key.getKeyId(), key);
+ } catch (IOException ex) {
+ LOGGER.error("Failed to load master key.", ex);
+ }
+ }
+ synchronized (this) {
+ super.allKeys.clear();
+ super.allKeys.putAll(keys);
+ }
+ return keys;
+ }
+
+ @Override
+ public byte[] retrievePassword(DelegationTokenIdentifier identifier)
+ throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+ DelegationTokenInformation info = this.tokenStore.getToken(identifier);
+ if (info == null) {
+ throw new InvalidToken("token expired or does not exist: " + identifier);
+ }
+ // must reuse super as info.getPassword is not accessible
+ synchronized (this) {
+ try {
+ super.currentTokens.put(identifier, info);
+ return super.retrievePassword(identifier);
+ } finally {
+ super.currentTokens.remove(identifier);
+ }
+ }
+ }
+
+ @Override
+ public DelegationTokenIdentifier cancelToken(Token<DelegationTokenIdentifier> token,
+ String canceller) throws IOException {
+ DelegationTokenIdentifier id = getTokenIdentifier(token);
+ LOGGER.info("Token cancelation requested for identifier: "+id);
+ this.tokenStore.removeToken(id);
+ return id;
+ }
+
+ /**
+ * Create the password and add it to shared store.
+ */
+ @Override
+ protected byte[] createPassword(DelegationTokenIdentifier id) {
+ byte[] password;
+ DelegationTokenInformation info;
+ synchronized (this) {
+ password = super.createPassword(id);
+ // add new token to shared store
+ // need to persist expiration along with password
+ info = super.currentTokens.remove(id);
+ if (info == null) {
+ throw new IllegalStateException("Failed to retrieve token after creation");
+ }
+ }
+ this.tokenStore.addToken(id, info);
+ return password;
+ }
+
+ @Override
+ public long renewToken(Token<DelegationTokenIdentifier> token,
+ String renewer) throws InvalidToken, IOException {
+ // since renewal is KERBEROS authenticated token may not be cached
+ final DelegationTokenIdentifier id = getTokenIdentifier(token);
+ DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id);
+ if (tokenInfo == null) {
+ throw new InvalidToken("token does not exist: " + id); // no token found
+ }
+ // ensure associated master key is available
+ if (!super.allKeys.containsKey(id.getMasterKeyId())) {
+ LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.",
+ id.getMasterKeyId());
+ reloadKeys();
+ }
+ // reuse super renewal logic
+ synchronized (this) {
+ super.currentTokens.put(id, tokenInfo);
+ try {
+ return super.renewToken(token, renewer);
+ } finally {
+ super.currentTokens.remove(id);
+ }
+ }
+ }
+
+ public static String encodeWritable(Writable key) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ key.write(dos);
+ dos.flush();
+ return Base64.encodeBase64URLSafeString(bos.toByteArray());
+ }
+
+ public static void decodeWritable(Writable w, String idStr) throws IOException {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
+ w.readFields(in);
+ }
+
+ /**
+ * Synchronize master key updates / sequence generation for multiple nodes.
+ * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need
+ * to utilize this "hook" to manipulate the key through the object reference.
+ * This .20S workaround should cease to exist when Hadoop supports token store.
+ */
+ @Override
+ protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+ int keySeq = this.tokenStore.addMasterKey(encodeWritable(key));
+ // update key with assigned identifier
+ DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey());
+ String keyStr = encodeWritable(keyWithSeq);
+ this.tokenStore.updateMasterKey(keySeq, keyStr);
+ decodeWritable(key, keyStr);
+ LOGGER.info("New master key with key id={}", key.getKeyId());
+ super.logUpdateMasterKey(key);
+ }
+
+ @Override
+ public synchronized void startThreads() throws IOException {
+ try {
+ // updateCurrentKey needs to be called to initialize the master key
+ // (there should be a null check added in the future in rollMasterKey)
+ // updateCurrentKey();
+ Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey");
+ m.setAccessible(true);
+ m.invoke(this);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize master key", e);
+ }
+ running = true;
+ tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
+ tokenRemoverThread.start();
+ }
+
+ @Override
+ public synchronized void stopThreads() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Stopping expired delegation token remover thread");
+ }
+ running = false;
+ if (tokenRemoverThread != null) {
+ tokenRemoverThread.interrupt();
+ }
+ }
+
+ /**
+ * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager}
+ * that cannot be reused due to private method access. Logic here can more efficiently
+ * deal with external token store by only loading into memory the minimum data needed.
+ */
+ protected void removeExpiredTokens() {
+ long now = System.currentTimeMillis();
+ Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
+ .iterator();
+ while (i.hasNext()) {
+ DelegationTokenIdentifier id = i.next();
+ if (now > id.getMaxDate()) {
+ this.tokenStore.removeToken(id); // no need to look at token info
+ } else {
+ // get token info to check renew date
+ DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
+ if (tokenInfo != null) {
+ if (now > tokenInfo.getRenewDate()) {
+ this.tokenStore.removeToken(id);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Extension of rollMasterKey to remove expired keys from store.
+ * @throws IOException
+ */
+ protected void rollMasterKeyExt() throws IOException {
+ Map<Integer, DelegationKey> keys = reloadKeys();
+ int currentKeyId = super.currentId;
+ HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
+ List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
+ for (DelegationKey key : keysAfterRoll) {
+ keys.remove(key.getKeyId());
+ if (key.getKeyId() == currentKeyId) {
+ tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+ }
+ }
+ for (DelegationKey expiredKey : keys.values()) {
+ LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
+ tokenStore.removeMasterKey(expiredKey.getKeyId());
+ }
+ }
+
+
+ /**
+ * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
+ * restriction (there would not be an need to clone the remove thread if the remove logic was
+ * protected/extensible).
+ */
+ protected class ExpiredTokenRemover extends Thread {
+ private long lastMasterKeyUpdate;
+ private long lastTokenCacheCleanup;
+
+ @Override
+ public void run() {
+ LOGGER.info("Starting expired delegation token remover thread, "
+ + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
+ / (60 * 1000) + " min(s)");
+ try {
+ while (running) {
+ long now = System.currentTimeMillis();
+ if (lastMasterKeyUpdate + keyUpdateInterval < now) {
+ try {
+ rollMasterKeyExt();
+ lastMasterKeyUpdate = now;
+ } catch (IOException e) {
+ LOGGER.error("Master key updating failed. "
+ + StringUtils.stringifyException(e));
+ }
+ }
+ if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
+ removeExpiredTokens();
+ lastTokenCacheCleanup = now;
+ }
+ try {
+ Thread.sleep(5000); // 5 seconds
+ } catch (InterruptedException ie) {
+ LOGGER
+ .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "
+ + ie);
+ }
+ }
+ } catch (Throwable t) {
+ LOGGER.error("ExpiredTokenRemover thread received unexpected exception. "
+ + t, t);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+ }
+
+}
Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper token store implementation.
+ */
+public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
+
+ private static final String ZK_SEQ_FORMAT = "%010d";
+ private static final String NODE_KEYS = "/keys";
+ private static final String NODE_TOKENS = "/tokens";
+
+ private String rootNode = "";
+ private volatile ZooKeeper zkSession;
+ private String zkConnectString;
+ private final int zkSessionTimeout = 3000;
+
+ private class ZooKeeperWatcher implements Watcher {
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ LOGGER.info(event.toString());
+ if (event.getState() == Watcher.Event.KeeperState.Expired) {
+ LOGGER.warn("ZooKeeper session expired, discarding connection");
+ try {
+ zkSession.close();
+ } catch (Throwable e) {
+ LOGGER.warn("Failed to close connection on expired session", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Default constructor for dynamic instantiation w/ Configurable
+ * (ReflectionUtils does not support Configuration constructor injection).
+ */
+ protected ZooKeeperTokenStore() {
+ }
+
+ public ZooKeeperTokenStore(String hostPort) {
+ this.zkConnectString = hostPort;
+ init();
+ }
+
+ private ZooKeeper getSession() {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ synchronized (this) {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ try {
+ zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout,
+ new ZooKeeperWatcher());
+ } catch (IOException ex) {
+ throw new TokenStoreError("Token store error.", ex);
+ }
+ }
+ }
+ }
+ return zkSession;
+ }
+
+ private static String ensurePath(ZooKeeper zk, String path) throws KeeperException,
+ InterruptedException {
+ String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
+ String currentPath = "";
+ for (String pathComp : pathComps) {
+ currentPath += "/" + pathComp;
+ try {
+ String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ LOGGER.info("Created path: " + node);
+ } catch (KeeperException.NodeExistsException e) {
+ }
+ }
+ return currentPath;
+ }
+
+ private void init() {
+ if (this.zkConnectString == null) {
+ throw new IllegalStateException("Not initialized");
+ }
+
+ if (this.zkSession != null) {
+ try {
+ this.zkSession.close();
+ } catch (InterruptedException ex) {
+ LOGGER.warn("Failed to close existing session.", ex);
+ }
+ }
+
+ ZooKeeper zk = getSession();
+ try {
+ ensurePath(zk, rootNode + NODE_KEYS);
+ ensurePath(zk, rootNode + NODE_TOKENS);
+ } catch (Exception e) {
+ throw new TokenStoreError("Failed to validate token path.", e);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf != null) {
+ this.zkConnectString = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+ this.rootNode = conf.get(
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE,
+ HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT);
+ }
+ init();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null; // not required
+ }
+
+ private Map<Integer, byte[]> getAllKeys() throws KeeperException,
+ InterruptedException {
+
+ String masterKeyNode = rootNode + NODE_KEYS;
+ ZooKeeper zk = getSession();
+ List<String> nodes = zk.getChildren(masterKeyNode, false);
+ Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
+ for (String node : nodes) {
+ byte[] data = zk.getData(masterKeyNode + "/" + node, false, null);
+ if (data != null) {
+ result.put(getSeq(node), data);
+ }
+ }
+ return result;
+ }
+
+ private int getSeq(String path) {
+ String[] pathComps = path.split("/");
+ return Integer.parseInt(pathComps[pathComps.length-1]);
+ }
+
+ @Override
+ public int addMasterKey(String s) {
+ try {
+ ZooKeeper zk = getSession();
+ String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ LOGGER.info("Added key {}", newNode);
+ return getSeq(newNode);
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public void updateMasterKey(int keySeq, String s) {
+ try {
+ ZooKeeper zk = getSession();
+ zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
+ -1);
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public boolean removeMasterKey(int keySeq) {
+ try {
+ ZooKeeper zk = getSession();
+ zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1);
+ return true;
+ } catch (KeeperException.NoNodeException ex) {
+ return false;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public String[] getMasterKeys() {
+ try {
+ Map<Integer, byte[]> allKeys = getAllKeys();
+ String[] result = new String[allKeys.size()];
+ int resultIdx = 0;
+ for (byte[] keyBytes : allKeys.values()) {
+ result[resultIdx++] = new String(keyBytes);
+ }
+ return result;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+
+ private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) {
+ try {
+ return rootNode + NODE_TOKENS + "/"
+ + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
+ } catch (IOException ex) {
+ throw new TokenStoreError("Failed to encode token identifier", ex);
+ }
+ }
+
+ @Override
+ public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+ DelegationTokenInformation token) {
+ try {
+ ZooKeeper zk = getSession();
+ byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
+ String newNode = zk.create(getTokenPath(tokenIdentifier),
+ tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOGGER.info("Added token: {}", newNode);
+ return true;
+ } catch (KeeperException.NodeExistsException ex) {
+ return false;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+ try {
+ ZooKeeper zk = getSession();
+ zk.delete(getTokenPath(tokenIdentifier), -1);
+ return true;
+ } catch (KeeperException.NoNodeException ex) {
+ return false;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+ try {
+ ZooKeeper zk = getSession();
+ byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null);
+ try {
+ return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+ } catch (Exception ex) {
+ throw new TokenStoreError("Failed to decode token", ex);
+ }
+ } catch (KeeperException.NoNodeException ex) {
+ return null;
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ }
+
+ @Override
+ public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+ String containerNode = rootNode + NODE_TOKENS;
+ final List<String> nodes;
+ try {
+ nodes = getSession().getChildren(containerNode, false);
+ } catch (KeeperException ex) {
+ throw new TokenStoreError(ex);
+ } catch (InterruptedException ex) {
+ throw new TokenStoreError(ex);
+ }
+ List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+ nodes.size());
+ for (String node : nodes) {
+ DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+ try {
+ TokenStoreDelegationTokenSecretManager.decodeWritable(id, node);
+ result.add(id);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to decode token '{}'", node);
+ }
+ }
+ return result;
+ }
+
+}
Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift.client;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.hive.thrift.TFilterTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+ * inside open(). So, we need to assume the correct UGI when the transport is opened
+ * so that the SASL mechanisms have access to the right principal. This transport
+ * wraps the Sasl transports to set up the right UGI context for open().
+ *
+ * This is used on the client side, where the API explicitly opens a transport to
+ * the server.
+ */
+ public class TUGIAssumingTransport extends TFilterTransport {
+ protected UserGroupInformation ugi;
+
+ public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+ super(wrapped);
+ this.ugi = ugi;
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() {
+ try {
+ wrapped.open();
+ } catch (TTransportException tte) {
+ // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+ // and unwraps this for us out of the doAs block. We then unwrap one
+ // more time in our catch clause to get back the TTE. (ugh)
+ throw new RuntimeException(tte);
+ }
+ return null;
+ }
+ });
+ } catch (IOException ioe) {
+ throw new RuntimeException("Received an ioe we never threw!", ioe);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Received an ie we never threw!", ie);
+ } catch (RuntimeException rte) {
+ if (rte.getCause() instanceof TTransportException) {
+ throw (TTransportException)rte.getCause();
+ } else {
+ throw rte;
+ }
+ }
+ }
+ }
Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Workaround for serialization of {@link DelegationTokenInformation} through package access.
+ * Future version of Hadoop should add this to DelegationTokenInformation itself.
+ */
+public final class HiveDelegationTokenSupport {
+
+ private HiveDelegationTokenSupport() {}
+
+ public static byte[] encodeDelegationTokenInformation(DelegationTokenInformation token) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(bos);
+ WritableUtils.writeVInt(out, token.password.length);
+ out.write(token.password);
+ out.writeLong(token.renewDate);
+ out.flush();
+ return bos.toByteArray();
+ } catch (IOException ex) {
+ throw new RuntimeException("Failed to encode token.", ex);
+ }
+ }
+
+ public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes)
+ throws IOException {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes));
+ DelegationTokenInformation token = new DelegationTokenInformation(0, null);
+ int len = WritableUtils.readVInt(in);
+ token.password = new byte[len];
+ in.readFully(token.password);
+ token.renewDate = in.readLong();
+ return token;
+ }
+
+ public static void rollMasterKey(
+ AbstractDelegationTokenSecretManager<? extends AbstractDelegationTokenIdentifier> mgr)
+ throws IOException {
+ mgr.rollMasterKey();
+ }
+
+}
Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java?rev=1229510&r1=1229509&r2=1229510&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java Tue Jan 10 11:15:02 2012
@@ -79,7 +79,7 @@ public abstract class ShimLoader {
}
public static synchronized HadoopThriftAuthBridge getHadoopThriftAuthBridge() {
- if ("0.20S".equals(getMajorVersion())) {
+ if (getHadoopShims().isSecureShimImpl()) {
return createShim("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge20S",
HadoopThriftAuthBridge.class);
} else {
@@ -87,7 +87,6 @@ public abstract class ShimLoader {
}
}
- @SuppressWarnings("unchecked")
private static <T> T loadShims(Map<String, String> classMap, Class<T> xface) {
String vers = getMajorVersion();
String className = classMap.get(vers);
@@ -96,7 +95,7 @@ public abstract class ShimLoader {
private static <T> T createShim(String className, Class<T> xface) {
try {
- Class clazz = Class.forName(className);
+ Class<?> clazz = Class.forName(className);
return xface.cast(clazz.newInstance());
} catch (Exception e) {
throw new RuntimeException("Could not load shims in class " +