You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2012/01/10 12:15:03 UTC

svn commit: r1229510 [2/2] - in /hive/trunk: ./ shims/ shims/src/0.20S/java/org/apache/hadoop/hive/shims/ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ shims/src/0.20S/java/org/apache/hadoop/hive/thrift/client/ shims/src/0.20S/java/org/apache/had...

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+
+ /**
+  * Functions that bridge Thrift's SASL transports to Hadoop's
+  * SASL callback handlers and authentication classes.
+  */
+ public class HadoopThriftAuthBridge20S extends HadoopThriftAuthBridge {
+   static final Log LOG = LogFactory.getLog(HadoopThriftAuthBridge.class);
+
+   @Override
+   public Client createClient() {
+     return new Client();
+   }
+
+   @Override
+   public Server createServer(String keytabFile, String principalConf) throws TTransportException {
+     return new Server(keytabFile, principalConf);
+   }
+
+   public static class Client extends HadoopThriftAuthBridge.Client {
+     /**
+      * Create a client-side SASL transport that wraps an underlying transport.
+      *
+      * @param method The authentication method to use. Currently only KERBEROS is
+      *               supported.
+      * @param serverPrincipal The Kerberos principal of the target server.
+      * @param underlyingTransport The underlying transport mechanism, usually a TSocket.
+      */
+
+     @Override
+     public TTransport createClientTransport(
+       String principalConfig, String host,
+        String methodStr, String tokenStrForm, TTransport underlyingTransport)
+       throws IOException {
+       AuthMethod method = AuthMethod.valueOf(AuthMethod.class, methodStr);
+
+       TTransport saslTransport = null;
+       switch (method) {
+         case DIGEST:
+           Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+           t.decodeFromUrlString(tokenStrForm);
+           saslTransport = new TSaslClientTransport(
+            method.getMechanismName(),
+            null,
+            null, SaslRpcServer.SASL_DEFAULT_REALM,
+            SaslRpcServer.SASL_PROPS, new SaslClientCallbackHandler(t),
+            underlyingTransport);
+           return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+
+         case KERBEROS:
+           String serverPrincipal = SecurityUtil.getServerPrincipal(principalConfig, host);
+           String names[] = SaslRpcServer.splitKerberosName(serverPrincipal);
+           if (names.length != 3) {
+             throw new IOException(
+               "Kerberos principal name does NOT have the expected hostname part: "
+                 + serverPrincipal);
+           }
+           try {
+             saslTransport = new TSaslClientTransport(
+               method.getMechanismName(),
+               null,
+               names[0], names[1],
+               SaslRpcServer.SASL_PROPS, null,
+               underlyingTransport);
+             return new TUGIAssumingTransport(saslTransport, UserGroupInformation.getCurrentUser());
+           } catch (SaslException se) {
+             throw new IOException("Could not instantiate SASL transport", se);
+           }
+
+         default:
+        throw new IOException("Unsupported authentication method: " + method);
+       }
+     }
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+      private final String userName;
+      private final char[] userPassword;
+
+      public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
+        this.userName = encodeIdentifier(token.getIdentifier());
+        this.userPassword = encodePassword(token.getPassword());
+      }
+
+      public void handle(Callback[] callbacks)
+      throws UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        RealmCallback rc = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof RealmChoiceCallback) {
+            continue;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            rc = (RealmCallback) callback;
+          } else {
+            throw new UnsupportedCallbackException(callback,
+                "Unrecognized SASL client callback");
+          }
+        }
+        if (nc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting username: " + userName);
+          }
+          nc.setName(userName);
+        }
+        if (pc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting userPassword");
+          }
+          pc.setPassword(userPassword);
+        }
+        if (rc != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL client callback: setting realm: "
+                + rc.getDefaultText());
+          }
+          rc.setText(rc.getDefaultText());
+        }
+      }
+
+      static String encodeIdentifier(byte[] identifier) {
+        return new String(Base64.encodeBase64(identifier));
+      }
+
+      static char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+       }
+     }
+       }
+
+   public static class Server extends HadoopThriftAuthBridge.Server {
+     final UserGroupInformation realUgi;
+     DelegationTokenSecretManager secretManager;
+     private final static long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
+     //Delegation token related keys
+     public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+       "hive.cluster.delegation.key.update-interval";
+     public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+       24*60*60*1000; // 1 day
+     public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+       "hive.cluster.delegation.token.renew-interval";
+     public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+       24*60*60*1000;  // 1 day
+     public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+       "hive.cluster.delegation.token.max-lifetime";
+     public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+       7*24*60*60*1000; // 7 days
+     public static final String DELEGATION_TOKEN_STORE_CLS =
+       "hive.cluster.delegation.token.store.class";
+     public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+         "hive.cluster.delegation.token.store.zookeeper.connectString";
+     public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE =
+         "hive.cluster.delegation.token.store.zookeeper.rootNode";
+     public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT =
+         "/hive/cluster/delegation";
+
+     public Server() throws TTransportException {
+       try {
+         realUgi = UserGroupInformation.getCurrentUser();
+       } catch (IOException ioe) {
+         throw new TTransportException(ioe);
+       }
+     }
+     /**
+      * Create a server with a kerberos keytab/principal.
+      */
+     private Server(String keytabFile, String principalConf)
+       throws TTransportException {
+       if (keytabFile == null || keytabFile.isEmpty()) {
+         throw new TTransportException("No keytab specified");
+       }
+       if (principalConf == null || principalConf.isEmpty()) {
+         throw new TTransportException("No principal specified");
+       }
+
+       // Login from the keytab
+       String kerberosName;
+       try {
+         kerberosName =
+           SecurityUtil.getServerPrincipal(principalConf, "0.0.0.0");
+         UserGroupInformation.loginUserFromKeytab(
+             kerberosName, keytabFile);
+         realUgi = UserGroupInformation.getLoginUser();
+         assert realUgi.isFromKeytab();
+       } catch (IOException ioe) {
+         throw new TTransportException(ioe);
+       }
+     }
+
+     /**
+      * Create a TTransportFactory that, upon connection of a client socket,
+      * negotiates a Kerberized SASL transport. The resulting TTransportFactory
+      * can be passed as both the input and output transport factory when
+      * instantiating a TThreadPoolServer, for example.
+      *
+      */
+     @Override
+     public TTransportFactory createTransportFactory() throws TTransportException
+     {
+       // Parse out the kerberos principal, host, realm.
+       String kerberosName = realUgi.getUserName();
+       final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
+       if (names.length != 3) {
+         throw new TTransportException("Kerberos principal should have 3 parts: " + kerberosName);
+       }
+
+       TSaslServerTransport.Factory transFactory = new TSaslServerTransport.Factory();
+       transFactory.addServerDefinition(
+         AuthMethod.KERBEROS.getMechanismName(),
+         names[0], names[1],  // two parts of kerberos principal
+         SaslRpcServer.SASL_PROPS,
+         new SaslRpcServer.SaslGssCallbackHandler());
+       transFactory.addServerDefinition(AuthMethod.DIGEST.getMechanismName(),
+          null, SaslRpcServer.SASL_DEFAULT_REALM,
+          SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(secretManager));
+
+       return new TUGIAssumingTransportFactory(transFactory, realUgi);
+     }
+
+     /**
+      * Wrap a TProcessor in such a way that, before processing any RPC, it
+      * assumes the UserGroupInformation of the user authenticated by
+      * the SASL transport.
+      */
+     @Override
+     public TProcessor wrapProcessor(TProcessor processor) {
+      return new TUGIAssumingProcessor(processor, secretManager);
+     }
+
+    protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf)
+        throws IOException {
+       String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+       if (StringUtils.isBlank(tokenStoreClassName)) {
+         return new MemoryTokenStore();
+       }
+       try {
+        Class<? extends TokenStoreDelegationTokenSecretManager.TokenStore> storeClass = Class
+            .forName(tokenStoreClassName).asSubclass(
+                TokenStoreDelegationTokenSecretManager.TokenStore.class);
+        return ReflectionUtils.newInstance(storeClass, conf);
+       } catch (ClassNotFoundException e) {
+        throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
+            e);
+       }
+     }
+
+     @Override
+     public void startDelegationTokenSecretManager(Configuration conf)
+     throws IOException{
+       long secretKeyInterval =
+         conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
+                        DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+       long tokenMaxLifetime =
+           conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+                        DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+       long tokenRenewInterval =
+           conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+                        DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+
+       secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+             tokenMaxLifetime,
+             tokenRenewInterval,
+             DELEGATION_TOKEN_GC_INTERVAL, getTokenStore(conf));
+       secretManager.startThreads();
+     }
+
+     @Override
+     public String getDelegationToken(final String owner, final String renewer)
+     throws IOException, InterruptedException {
+       if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+         throw new AuthorizationException(
+         "Delegation Token can be issued only with kerberos authentication");
+       }
+       //if the user asking the token is same as the 'owner' then don't do
+       //any proxy authorization checks. For cases like oozie, where it gets
+       //a delegation token for another user, we need to make sure oozie is
+       //authorized to get a delegation token.
+       //Do all checks on short names
+       UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+       UserGroupInformation ownerUgi = UserGroupInformation.createRemoteUser(owner);
+       if (!ownerUgi.getShortUserName().equals(currUser.getShortUserName())) {
+         //in the case of proxy users, the getCurrentUser will return the
+         //real user (for e.g. oozie) due to the doAs that happened just before the
+         //server started executing the method getDelegationToken in the MetaStore
+         ownerUgi = UserGroupInformation.createProxyUser(owner,
+           UserGroupInformation.getCurrentUser());
+         InetAddress remoteAddr = getRemoteAddress();
+         ProxyUsers.authorize(ownerUgi,remoteAddr.getHostAddress(), null);
+       }
+       return ownerUgi.doAs(new PrivilegedExceptionAction<String>() {
+         public String run() throws IOException {
+           return secretManager.getDelegationToken(renewer);
+         }
+       });
+     }
+
+     @Override
+     public long renewDelegationToken(String tokenStrForm) throws IOException {
+       if (!authenticationMethod.get().equals(AuthenticationMethod.KERBEROS)) {
+         throw new AuthorizationException(
+         "Delegation Token can be issued only with kerberos authentication");
+       }
+       return secretManager.renewDelegationToken(tokenStrForm);
+     }
+
+     @Override
+     public void cancelDelegationToken(String tokenStrForm) throws IOException {
+       secretManager.cancelDelegationToken(tokenStrForm);
+     }
+
+     final static ThreadLocal<InetAddress> remoteAddress =
+       new ThreadLocal<InetAddress>() {
+       @Override
+       protected synchronized InetAddress initialValue() {
+         return null;
+       }
+     };
+
+     @Override
+     public InetAddress getRemoteAddress() {
+       return remoteAddress.get();
+     }
+
+     final static ThreadLocal<AuthenticationMethod> authenticationMethod =
+       new ThreadLocal<AuthenticationMethod>() {
+       @Override
+       protected synchronized AuthenticationMethod initialValue() {
+         return AuthenticationMethod.TOKEN;
+       }
+     };
+
+    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+    // This code is pretty much completely based on Hadoop's
+    // SaslRpcServer.SaslDigestCallbackHandler - the only reason we could not
+    // use that Hadoop class as-is was because it needs a Server.Connection object
+    // which is relevant in hadoop rpc but not here in the metastore - so the
+    // code below does not deal with the Connection Server.object.
+    static class SaslDigestCallbackHandler implements CallbackHandler {
+      private final DelegationTokenSecretManager secretManager;
+
+      public SaslDigestCallbackHandler(
+          DelegationTokenSecretManager secretManager) {
+        this.secretManager = secretManager;
+      }
+
+      private char[] getPassword(DelegationTokenIdentifier tokenid) throws InvalidToken {
+        return encodePassword(secretManager.retrievePassword(tokenid));
+      }
+
+      private char[] encodePassword(byte[] password) {
+        return new String(Base64.encodeBase64(password)).toCharArray();
+      }
+      /** {@inheritDoc} */
+      @Override
+      public void handle(Callback[] callbacks) throws InvalidToken,
+      UnsupportedCallbackException {
+        NameCallback nc = null;
+        PasswordCallback pc = null;
+        AuthorizeCallback ac = null;
+        for (Callback callback : callbacks) {
+          if (callback instanceof AuthorizeCallback) {
+            ac = (AuthorizeCallback) callback;
+          } else if (callback instanceof NameCallback) {
+            nc = (NameCallback) callback;
+          } else if (callback instanceof PasswordCallback) {
+            pc = (PasswordCallback) callback;
+          } else if (callback instanceof RealmCallback) {
+            continue; // realm is ignored
+          } else {
+            throw new UnsupportedCallbackException(callback,
+            "Unrecognized SASL DIGEST-MD5 Callback");
+          }
+        }
+        if (pc != null) {
+          DelegationTokenIdentifier tokenIdentifier = SaslRpcServer.
+          getIdentifier(nc.getDefaultName(), secretManager);
+          char[] password = getPassword(tokenIdentifier);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("SASL server DIGEST-MD5 callback: setting password "
+                + "for client: " + tokenIdentifier.getUser());
+          }
+          pc.setPassword(password);
+        }
+        if (ac != null) {
+          String authid = ac.getAuthenticationID();
+          String authzid = ac.getAuthorizationID();
+          if (authid.equals(authzid)) {
+            ac.setAuthorized(true);
+          } else {
+            ac.setAuthorized(false);
+          }
+          if (ac.isAuthorized()) {
+            if (LOG.isDebugEnabled()) {
+              String username =
+                SaslRpcServer.getIdentifier(authzid, secretManager).getUser().getUserName();
+              LOG.debug("SASL server DIGEST-MD5 callback: setting "
+                  + "canonicalized client ID: " + username);
+            }
+            ac.setAuthorizedID(authzid);
+          }
+        }
+      }
+     }
+
+     /**
+      * Processor that pulls the SaslServer object out of the transport, and
+      * assumes the remote user's UGI before calling through to the original
+      * processor.
+      *
+      * This is used on the server side to set the UGI for each specific call.
+      */
+     private class TUGIAssumingProcessor implements TProcessor {
+       final TProcessor wrapped;
+       DelegationTokenSecretManager secretManager;
+       TUGIAssumingProcessor(TProcessor wrapped, DelegationTokenSecretManager secretManager) {
+         this.wrapped = wrapped;
+         this.secretManager = secretManager;
+       }
+
+       public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
+         TTransport trans = inProt.getTransport();
+         if (!(trans instanceof TSaslServerTransport)) {
+           throw new TException("Unexpected non-SASL transport " + trans.getClass());
+         }
+         TSaslServerTransport saslTrans = (TSaslServerTransport)trans;
+         SaslServer saslServer = saslTrans.getSaslServer();
+         String authId = saslServer.getAuthorizationID();
+         authenticationMethod.set(AuthenticationMethod.KERBEROS);
+         LOG.debug("AUTH ID ======>" + authId);
+         String endUser = authId;
+
+         if(saslServer.getMechanismName().equals("DIGEST-MD5")) {
+           try {
+             TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authId,
+                 secretManager);
+             endUser = tokenId.getUser().getUserName();
+             authenticationMethod.set(AuthenticationMethod.TOKEN);
+           } catch (InvalidToken e) {
+             throw new TException(e.getMessage());
+           }
+         }
+         Socket socket = ((TSocket)(saslTrans.getUnderlyingTransport())).getSocket();
+         remoteAddress.set(socket.getInetAddress());
+         try {
+           UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(
+              endUser, UserGroupInformation.getLoginUser());
+           return clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
+               public Boolean run() {
+                 try {
+                   return wrapped.process(inProt, outProt);
+                 } catch (TException te) {
+                   throw new RuntimeException(te);
+                 }
+               }
+             });
+         } catch (RuntimeException rte) {
+           if (rte.getCause() instanceof TException) {
+             throw (TException)rte.getCause();
+           }
+           throw rte;
+         } catch (InterruptedException ie) {
+           throw new RuntimeException(ie); // unexpected!
+         } catch (IOException ioe) {
+           throw new RuntimeException(ioe); // unexpected!
+         }
+       }
+     }
+
+    /**
+      * A TransportFactory that wraps another one, but assumes a specified UGI
+      * before calling through.
+      *
+      * This is used on the server side to assume the server's Principal when accepting
+      * clients.
+      */
+     static class TUGIAssumingTransportFactory extends TTransportFactory {
+       private final UserGroupInformation ugi;
+       private final TTransportFactory wrapped;
+
+       public TUGIAssumingTransportFactory(TTransportFactory wrapped, UserGroupInformation ugi) {
+         assert wrapped != null;
+         assert ugi != null;
+
+         this.wrapped = wrapped;
+         this.ugi = ugi;
+       }
+
+       @Override
+       public TTransport getTransport(final TTransport trans) {
+         return ugi.doAs(new PrivilegedAction<TTransport>() {
+           public TTransport run() {
+             return wrapped.getTransport(trans);
+           }
+         });
+       }
+     }
+   }
+ }

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+  private final java.util.concurrent.ConcurrentHashMap<Integer, String> masterKeys
+      = new java.util.concurrent.ConcurrentHashMap<Integer, String>();
+
+  private final java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+      = new java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+
+  private final AtomicInteger masterKeySeq = new AtomicInteger();
+
+  @Override
+  public void setConf(Configuration conf) {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  @Override
+  public int addMasterKey(String s) {
+    int keySeq = masterKeySeq.getAndIncrement();
+    masterKeys.putIfAbsent(keySeq, s);
+    return keySeq;
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) {
+    masterKeys.put(keySeq, s);
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    return masterKeys.remove(keySeq) != null;
+  }
+
+  @Override
+  public String[] getMasterKeys() {
+    return masterKeys.values().toArray(new String[0]);
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+    DelegationTokenInformation token) {
+    DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+    return (tokenInfo == null);
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+    DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+    return tokenInfo != null;
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+    return tokens.get(tokenIdentifier);
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+    List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+        tokens.size());
+    for (DelegationTokenIdentifier id : tokens.keySet()) {
+        result.add(id);
+    }
+    return result;
+  }
+
+}

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory
+ * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.).
+ * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not
+ * cached in memory. This avoids complexities related to token expiration. The security token is
+ * needed only at the time the transport is opened (as opposed to per interface operation). The
+ * assumption therefore is low cost of interprocess token retrieval (for random read efficient store
+ * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches.
+ * The wrapper incorporates the token store abstraction within the limitations of current
+ * Hive/Hadoop dependency (.20S) with minimum code duplication.
+ * Eventually this should be supported by Hadoop security directly.
+ */
+public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
+
+  /**
+   * Exception for internal token store errors that typically cannot be handled by the caller.
+   */
+  public static class TokenStoreError extends RuntimeException {
+    private static final long serialVersionUID = -8693819817623074083L;
+
+    public TokenStoreError(Throwable cause) {
+      super(cause);
+    }
+
+    public TokenStoreError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Interface for pluggable token store that can be implemented as shared store with external
+   * storage (for example with ZooKeeper for HA).
+   * Internal, store specific errors are translated into {@link TokenStoreError}.
+   */
+  public static interface TokenStore extends Configurable {
+    /**
+     * Add new master key. The token store assigns and returns the sequence number.
+     * Caller needs to use the identifier to update the key (since it is embedded in the key).
+     *
+     * @param s
+     * @return sequence number for new key
+     */
+    int addMasterKey(String s) throws TokenStoreError;
+
+    void updateMasterKey(int keySeq, String s) throws TokenStoreError;
+
+    /**
+     * Remove key for given id.
+     * @param keySeq
+     * @return false if key no longer present, true otherwise.
+     */
+    boolean removeMasterKey(int keySeq);
+
+    String[] getMasterKeys() throws TokenStoreError;
+
+    /**
+     * Add token. If identifier is already present, token won't be added.
+     * @param tokenIdentifier
+     * @param token
+     * @return true if token was added, false for existing identifier
+     */
+    boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+        DelegationTokenInformation token) throws TokenStoreError;
+
+    /**
+     * Get token. Returns null if the token does not exist.
+     * @param tokenIdentifier
+     * @return
+     */
+    DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+        throws TokenStoreError;
+
+    /**
+     * Remove token. Ignores token does not exist.
+     * @param tokenIdentifier
+     */
+    boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError;
+
+    /**
+     * List of all token identifiers in the store. This is used to remove expired tokens
+     * and a potential scalability improvement would be to partition by master key id
+     * @return
+     */
+    List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
+
+  }
+
+  final private long keyUpdateInterval;
+  final private long tokenRemoverScanInterval;
+  private Thread tokenRemoverThread;
+
+  final private TokenStore tokenStore;
+
+  public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval, TokenStore sharedStore) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
+        delegationTokenRemoverScanInterval);
+    this.keyUpdateInterval = delegationKeyUpdateInterval;
+    this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
+
+    this.tokenStore = sharedStore;
+  }
+
+  protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+      throws IOException {
+    // turn bytes back into identifier for cache lookup
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id;
+  }
+
+  protected Map<Integer, DelegationKey> reloadKeys() {
+    // read keys from token store
+    String[] allKeys = tokenStore.getMasterKeys();
+    Map<Integer, DelegationKey> keys
+        = new java.util.HashMap<Integer, DelegationKey>(allKeys.length);
+    for (String keyStr : allKeys) {
+      DelegationKey key = new DelegationKey();
+      try {
+        decodeWritable(key, keyStr);
+        keys.put(key.getKeyId(), key);
+      } catch (IOException ex) {
+        LOGGER.error("Failed to load master key.", ex);
+      }
+    }
+    synchronized (this) {
+        super.allKeys.clear();
+        super.allKeys.putAll(keys);
+    }
+    return keys;
+  }
+
+  @Override
+  public byte[] retrievePassword(DelegationTokenIdentifier identifier)
+      throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+      DelegationTokenInformation info = this.tokenStore.getToken(identifier);
+      if (info == null) {
+          throw new InvalidToken("token expired or does not exist: " + identifier);
+      }
+      // must reuse super as info.getPassword is not accessible
+      synchronized (this) {
+        try {
+          super.currentTokens.put(identifier, info);
+          return super.retrievePassword(identifier);
+        } finally {
+          super.currentTokens.remove(identifier);
+        }
+      }
+  }
+
+  @Override
+  public DelegationTokenIdentifier cancelToken(Token<DelegationTokenIdentifier> token,
+      String canceller) throws IOException {
+    DelegationTokenIdentifier id = getTokenIdentifier(token);
+    LOGGER.info("Token cancelation requested for identifier: "+id);
+    this.tokenStore.removeToken(id);
+    return id;
+  }
+
+  /**
+   * Create the password and add it to shared store.
+   */
+  @Override
+  protected byte[] createPassword(DelegationTokenIdentifier id) {
+    byte[] password;
+    DelegationTokenInformation info;
+    synchronized (this) {
+      password = super.createPassword(id);
+      // add new token to shared store
+      // need to persist expiration along with password
+      info = super.currentTokens.remove(id);
+      if (info == null) {
+        throw new IllegalStateException("Failed to retrieve token after creation");
+      }
+    }
+    this.tokenStore.addToken(id, info);
+    return password;
+  }
+
+  @Override
+  public long renewToken(Token<DelegationTokenIdentifier> token,
+      String renewer) throws InvalidToken, IOException {
+    // since renewal is KERBEROS authenticated token may not be cached
+    final DelegationTokenIdentifier id = getTokenIdentifier(token);
+    DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id);
+    if (tokenInfo == null) {
+        throw new InvalidToken("token does not exist: " + id); // no token found
+    }
+    // ensure associated master key is available
+    if (!super.allKeys.containsKey(id.getMasterKeyId())) {
+      LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.",
+        id.getMasterKeyId());
+      reloadKeys();
+    }
+    // reuse super renewal logic
+    synchronized (this) {
+      super.currentTokens.put(id,  tokenInfo);
+      try {
+        return super.renewToken(token, renewer);
+      } finally {
+        super.currentTokens.remove(id);
+      }
+    }
+  }
+
+  public static String encodeWritable(Writable key) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    key.write(dos);
+    dos.flush();
+    return Base64.encodeBase64URLSafeString(bos.toByteArray());
+  }
+
+  public static void decodeWritable(Writable w, String idStr) throws IOException {
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
+    w.readFields(in);
+  }
+
+  /**
+   * Synchronize master key updates / sequence generation for multiple nodes.
+   * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need
+   * to utilize this "hook" to manipulate the key through the object reference.
+   * This .20S workaround should cease to exist when Hadoop supports token store.
+   */
+  @Override
+  protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+    int keySeq = this.tokenStore.addMasterKey(encodeWritable(key));
+    // update key with assigned identifier
+    DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey());
+    String keyStr = encodeWritable(keyWithSeq);
+    this.tokenStore.updateMasterKey(keySeq, keyStr);
+    decodeWritable(key, keyStr);
+    LOGGER.info("New master key with key id={}", key.getKeyId());
+    super.logUpdateMasterKey(key);
+  }
+
+  @Override
+  public synchronized void startThreads() throws IOException {
+    try {
+      // updateCurrentKey needs to be called to initialize the master key
+      // (there should be a null check added in the future in rollMasterKey)
+      // updateCurrentKey();
+      Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey");
+      m.setAccessible(true);
+      m.invoke(this);
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize master key", e);
+    }
+    running = true;
+    tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
+    tokenRemoverThread.start();
+  }
+
+  @Override
+  public synchronized void stopThreads() {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Stopping expired delegation token remover thread");
+    }
+    running = false;
+    if (tokenRemoverThread != null) {
+      tokenRemoverThread.interrupt();
+    }
+  }
+
+  /**
+   * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager}
+   * that cannot be reused due to private method access. Logic here can more efficiently
+   * deal with external token store by only loading into memory the minimum data needed.
+   */
+  protected void removeExpiredTokens() {
+    long now = System.currentTimeMillis();
+    Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
+        .iterator();
+    while (i.hasNext()) {
+      DelegationTokenIdentifier id = i.next();
+      if (now > id.getMaxDate()) {
+        this.tokenStore.removeToken(id); // no need to look at token info
+      } else {
+        // get token info to check renew date
+        DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
+        if (tokenInfo != null) {
+          if (now > tokenInfo.getRenewDate()) {
+            this.tokenStore.removeToken(id);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Extension of rollMasterKey to remove expired keys from store.
+   * @throws IOException
+   */
+  protected void rollMasterKeyExt() throws IOException {
+    Map<Integer, DelegationKey> keys = reloadKeys();
+    int currentKeyId = super.currentId;
+    HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
+    List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
+    for (DelegationKey key : keysAfterRoll) {
+        keys.remove(key.getKeyId());
+        if (key.getKeyId() == currentKeyId) {
+          tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+        }
+    }
+    for (DelegationKey expiredKey : keys.values()) {
+      LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
+      tokenStore.removeMasterKey(expiredKey.getKeyId());
+    }
+  }
+
+
+  /**
+   * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
+   * restriction (there would not be an need to clone the remove thread if the remove logic was
+   * protected/extensible).
+   */
+  protected class ExpiredTokenRemover extends Thread {
+    private long lastMasterKeyUpdate;
+    private long lastTokenCacheCleanup;
+
+    @Override
+    public void run() {
+      LOGGER.info("Starting expired delegation token remover thread, "
+          + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
+          / (60 * 1000) + " min(s)");
+      try {
+        while (running) {
+          long now = System.currentTimeMillis();
+          if (lastMasterKeyUpdate + keyUpdateInterval < now) {
+            try {
+              rollMasterKeyExt();
+              lastMasterKeyUpdate = now;
+            } catch (IOException e) {
+              LOGGER.error("Master key updating failed. "
+                  + StringUtils.stringifyException(e));
+            }
+          }
+          if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
+            removeExpiredTokens();
+            lastTokenCacheCleanup = now;
+          }
+          try {
+            Thread.sleep(5000); // 5 seconds
+          } catch (InterruptedException ie) {
+            LOGGER
+            .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "
+                + ie);
+          }
+        }
+      } catch (Throwable t) {
+        LOGGER.error("ExpiredTokenRemover thread received unexpected exception. "
+            + t, t);
+        Runtime.getRuntime().exit(-1);
+      }
+    }
+  }
+
+}

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper token store implementation.
+ */
+public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
+
+  private static final String ZK_SEQ_FORMAT = "%010d";
+  private static final String NODE_KEYS = "/keys";
+  private static final String NODE_TOKENS = "/tokens";
+
+  private String rootNode = "";
+  private volatile ZooKeeper zkSession;
+  private String zkConnectString;
+  private final int zkSessionTimeout = 3000;
+
+  private class ZooKeeperWatcher implements Watcher {
+    public void process(org.apache.zookeeper.WatchedEvent event) {
+      LOGGER.info(event.toString());
+      if (event.getState() == Watcher.Event.KeeperState.Expired) {
+        LOGGER.warn("ZooKeeper session expired, discarding connection");
+        try {
+          zkSession.close();
+        } catch (Throwable e) {
+          LOGGER.warn("Failed to close connection on expired session", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Default constructor for dynamic instantiation w/ Configurable
+   * (ReflectionUtils does not support Configuration constructor injection).
+   */
+  protected ZooKeeperTokenStore() {
+  }
+
+  public ZooKeeperTokenStore(String hostPort) {
+    this.zkConnectString = hostPort;
+    init();
+  }
+
+  private ZooKeeper getSession() {
+    if (zkSession == null || zkSession.getState() == States.CLOSED) {
+        synchronized (this) {
+          if (zkSession == null || zkSession.getState() == States.CLOSED) {
+            try {
+            zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout,
+                new ZooKeeperWatcher());
+            } catch (IOException ex) {
+              throw new TokenStoreError("Token store error.", ex);
+            }
+          }
+        }
+    }
+    return zkSession;
+  }
+
+  private static String ensurePath(ZooKeeper zk, String path) throws KeeperException,
+      InterruptedException {
+    String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
+    String currentPath = "";
+    for (String pathComp : pathComps) {
+      currentPath += "/" + pathComp;
+      try {
+        String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+        LOGGER.info("Created path: " + node);
+      } catch (KeeperException.NodeExistsException e) {
+      }
+    }
+    return currentPath;
+  }
+
+  private void init() {
+    if (this.zkConnectString == null) {
+      throw new IllegalStateException("Not initialized");
+    }
+
+    if (this.zkSession != null) {
+      try {
+        this.zkSession.close();
+      } catch (InterruptedException ex) {
+        LOGGER.warn("Failed to close existing session.", ex);
+      }
+    }
+
+    ZooKeeper zk = getSession();
+    try {
+        ensurePath(zk, rootNode + NODE_KEYS);
+        ensurePath(zk, rootNode + NODE_TOKENS);
+      } catch (Exception e) {
+        throw new TokenStoreError("Failed to validate token path.", e);
+      }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf != null) {
+      this.zkConnectString = conf.get(
+          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+      this.rootNode = conf.get(
+          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE,
+          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT);
+    }
+    init();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null; // not required
+  }
+
+  private Map<Integer, byte[]> getAllKeys() throws KeeperException,
+      InterruptedException {
+
+    String masterKeyNode = rootNode + NODE_KEYS;
+    ZooKeeper zk = getSession();
+    List<String> nodes = zk.getChildren(masterKeyNode, false);
+    Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
+    for (String node : nodes) {
+      byte[] data = zk.getData(masterKeyNode + "/" + node, false, null);
+      if (data != null) {
+        result.put(getSeq(node), data);
+      }
+    }
+    return result;
+  }
+
+  private int getSeq(String path) {
+    String[] pathComps = path.split("/");
+    return Integer.parseInt(pathComps[pathComps.length-1]);
+  }
+
+  @Override
+  public int addMasterKey(String s) {
+    try {
+      ZooKeeper zk = getSession();
+      String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT_SEQUENTIAL);
+      LOGGER.info("Added key {}", newNode);
+      return getSeq(newNode);
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) {
+    try {
+      ZooKeeper zk = getSession();
+      zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
+          -1);
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    try {
+      ZooKeeper zk = getSession();
+      zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1);
+      return true;
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public String[] getMasterKeys() {
+    try {
+      Map<Integer, byte[]> allKeys = getAllKeys();
+      String[] result = new String[allKeys.size()];
+      int resultIdx = 0;
+      for (byte[] keyBytes : allKeys.values()) {
+          result[resultIdx++] = new String(keyBytes);
+      }
+      return result;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+
+  private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) {
+    try {
+      return rootNode + NODE_TOKENS + "/"
+          + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
+    } catch (IOException ex) {
+      throw new TokenStoreError("Failed to encode token identifier", ex);
+    }
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+      DelegationTokenInformation token) {
+    try {
+      ZooKeeper zk = getSession();
+      byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
+      String newNode = zk.create(getTokenPath(tokenIdentifier),
+          tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      LOGGER.info("Added token: {}", newNode);
+      return true;
+    } catch (KeeperException.NodeExistsException ex) {
+      return false;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+    try {
+      ZooKeeper zk = getSession();
+      zk.delete(getTokenPath(tokenIdentifier), -1);
+      return true;
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+    try {
+      ZooKeeper zk = getSession();
+      byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null);
+      try {
+        return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+      } catch (Exception ex) {
+        throw new TokenStoreError("Failed to decode token", ex);
+      }
+    } catch (KeeperException.NoNodeException ex) {
+      return null;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+    String containerNode = rootNode + NODE_TOKENS;
+    final List<String> nodes;
+    try  {
+      nodes = getSession().getChildren(containerNode, false);
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+    List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+        nodes.size());
+    for (String node : nodes) {
+      DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+      try {
+        TokenStoreDelegationTokenSecretManager.decodeWritable(id, node);
+        result.add(id);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to decode token '{}'", node);
+      }
+    }
+    return result;
+  }
+
+}

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/hive/thrift/client/TUGIAssumingTransport.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift.client;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.hive.thrift.TFilterTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+  * The Thrift SASL transports call Sasl.createSaslServer and Sasl.createSaslClient
+  * inside open(). So, we need to assume the correct UGI when the transport is opened
+  * so that the SASL mechanisms have access to the right principal. This transport
+  * wraps the Sasl transports to set up the right UGI context for open().
+  *
+  * This is used on the client side, where the API explicitly opens a transport to
+  * the server.
+  */
+ public class TUGIAssumingTransport extends TFilterTransport {
+   protected UserGroupInformation ugi;
+
+   public TUGIAssumingTransport(TTransport wrapped, UserGroupInformation ugi) {
+     super(wrapped);
+     this.ugi = ugi;
+   }
+
+   @Override
+   public void open() throws TTransportException {
+     try {
+       ugi.doAs(new PrivilegedExceptionAction<Void>() {
+         public Void run() {
+           try {
+             wrapped.open();
+           } catch (TTransportException tte) {
+             // Wrap the transport exception in an RTE, since UGI.doAs() then goes
+             // and unwraps this for us out of the doAs block. We then unwrap one
+             // more time in our catch clause to get back the TTE. (ugh)
+             throw new RuntimeException(tte);
+           }
+           return null;
+         }
+       });
+     } catch (IOException ioe) {
+       throw new RuntimeException("Received an ioe we never threw!", ioe);
+     } catch (InterruptedException ie) {
+       throw new RuntimeException("Received an ie we never threw!", ie);
+     } catch (RuntimeException rte) {
+       if (rte.getCause() instanceof TTransportException) {
+         throw (TTransportException)rte.getCause();
+       } else {
+         throw rte;
+       }
+     }
+   }
+ }

Added: hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java?rev=1229510&view=auto
==============================================================================
--- hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (added)
+++ hive/trunk/shims/src/common-secure/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java Tue Jan 10 11:15:02 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Workaround for serialization of {@link DelegationTokenInformation} through package access.
+ * Future version of Hadoop should add this to DelegationTokenInformation itself.
+ */
+public final class HiveDelegationTokenSupport {
+
+  private HiveDelegationTokenSupport() {}
+
+  public static byte[] encodeDelegationTokenInformation(DelegationTokenInformation token) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream out = new DataOutputStream(bos);
+      WritableUtils.writeVInt(out, token.password.length);
+      out.write(token.password);
+      out.writeLong(token.renewDate);
+      out.flush();
+      return bos.toByteArray();
+    } catch (IOException ex) {
+      throw new RuntimeException("Failed to encode token.", ex);
+    }
+  }
+
+  public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes)
+      throws IOException {
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes));
+    DelegationTokenInformation token = new DelegationTokenInformation(0, null);
+    int len = WritableUtils.readVInt(in);
+    token.password = new byte[len];
+    in.readFully(token.password);
+    token.renewDate = in.readLong();
+    return token;
+  }
+
+  public static void rollMasterKey(
+      AbstractDelegationTokenSecretManager<? extends AbstractDelegationTokenIdentifier> mgr)
+      throws IOException {
+    mgr.rollMasterKey();
+  }
+
+}

Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java?rev=1229510&r1=1229509&r2=1229510&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/ShimLoader.java Tue Jan 10 11:15:02 2012
@@ -79,7 +79,7 @@ public abstract class ShimLoader {
   }
 
   public static synchronized HadoopThriftAuthBridge getHadoopThriftAuthBridge() {
-        if ("0.20S".equals(getMajorVersion())) {
+      if (getHadoopShims().isSecureShimImpl()) {
           return createShim("org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge20S",
                             HadoopThriftAuthBridge.class);
         } else {
@@ -87,7 +87,6 @@ public abstract class ShimLoader {
         }
       }
 
-  @SuppressWarnings("unchecked")
   private static <T> T loadShims(Map<String, String> classMap, Class<T> xface) {
     String vers = getMajorVersion();
     String className = classMap.get(vers);
@@ -96,7 +95,7 @@ public abstract class ShimLoader {
 
     private static <T> T createShim(String className, Class<T> xface) {
     try {
-      Class clazz = Class.forName(className);
+      Class<?> clazz = Class.forName(className);
       return xface.cast(clazz.newInstance());
     } catch (Exception e) {
       throw new RuntimeException("Could not load shims in class " +