You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2011/12/07 03:31:00 UTC

svn commit: r1211275 - in /hive/trunk/shims: ./ src/0.20S/java/org/apache/hadoop/hive/thrift/ src/0.20S/java/org/apache/hadoop/security/ src/0.20S/java/org/apache/hadoop/security/token/ src/0.20S/java/org/apache/hadoop/security/token/delegation/ src/te...

Author: hashutosh
Date: Wed Dec  7 02:31:00 2011
New Revision: 1211275

URL: http://svn.apache.org/viewvc?rev=1211275&view=rev
Log:
HIVE-2467 : HA Support for Metastore Server (Thomas Weise via Ashutosh Chauhan)

Added:
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
Modified:
    hive/trunk/shims/ivy.xml
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
    hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java

Modified: hive/trunk/shims/ivy.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/ivy.xml?rev=1211275&r1=1211274&r2=1211275&view=diff
==============================================================================
--- hive/trunk/shims/ivy.xml (original)
+++ hive/trunk/shims/ivy.xml Wed Dec  7 02:31:00 2011
@@ -33,6 +33,10 @@
     <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop.security.version}">
       <artifact name="hadoop" type="source" ext="tar.gz"/>
     </dependency>
+    <dependency org="org.apache.zookeeper" name="zookeeper"
+                rev="${zookeeper.version}" transitive="false">
+      <include type="jar"/>
+    </dependency>
     <dependency org="org.apache.thrift" name="libthrift" rev="${libthrift.version}"
                 transitive="false"/>
     <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}"

Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java?rev=1211275&r1=1211274&r2=1211275&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/HadoopThriftAuthBridge20S.java Wed Dec  7 02:31:00 2011
@@ -35,6 +35,7 @@ import javax.security.sasl.SaslException
 import javax.security.sasl.SaslServer;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,6 +49,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocol;
@@ -330,6 +332,14 @@ import org.apache.thrift.transport.TTran
        "hive.cluster.delegation.token.max-lifetime";
      public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
        7*24*60*60*1000; // 7 days
+     public static final String DELEGATION_TOKEN_STORE_CLS =
+       "hive.cluster.delegation.token.store.class";
+     public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+         "hive.cluster.delegation.token.store.zookeeper.connectString";
+     public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE =
+         "hive.cluster.delegation.token.store.zookeeper.rootNode";
+     public static final String DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT =
+         "/hive/cluster/delegation";
 
      public Server() throws TTransportException {
        try {
@@ -404,6 +414,23 @@ import org.apache.thrift.transport.TTran
       return new TUGIAssumingProcessor(processor, secretManager);
      }
 
+    protected TokenStoreDelegationTokenSecretManager.TokenStore getTokenStore(Configuration conf)
+        throws IOException {
+       String tokenStoreClassName = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+       if (StringUtils.isBlank(tokenStoreClassName)) {
+         return new MemoryTokenStore();
+       }
+       try {
+        Class<? extends TokenStoreDelegationTokenSecretManager.TokenStore> storeClass = Class
+            .forName(tokenStoreClassName).asSubclass(
+                TokenStoreDelegationTokenSecretManager.TokenStore.class);
+        return ReflectionUtils.newInstance(storeClass, conf);
+       } catch (ClassNotFoundException e) {
+        throw new IOException("Error initializing delegation token store: " + tokenStoreClassName,
+            e);
+       }
+     }
+
      @Override
      public void startDelegationTokenSecretManager(Configuration conf)
      throws IOException{
@@ -416,11 +443,11 @@ import org.apache.thrift.transport.TTran
        long tokenRenewInterval =
            conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
                         DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
-       secretManager =
-           new DelegationTokenSecretManager(secretKeyInterval,
-                                            tokenMaxLifetime,
-                                            tokenRenewInterval,
-                                            DELEGATION_TOKEN_GC_INTERVAL);
+
+       secretManager = new TokenStoreDelegationTokenSecretManager(secretKeyInterval,
+             tokenMaxLifetime,
+             tokenRenewInterval,
+             DELEGATION_TOKEN_GC_INTERVAL, getTokenStore(conf));
        secretManager.startThreads();
      }
 

Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/MemoryTokenStore.java Wed Dec  7 02:31:00 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Default in-memory token store implementation.
+ */
+public class MemoryTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+  private final java.util.concurrent.ConcurrentHashMap<Integer, String> masterKeys
+      = new java.util.concurrent.ConcurrentHashMap<Integer, String>();
+
+  private final java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation> tokens
+      = new java.util.concurrent.ConcurrentHashMap<DelegationTokenIdentifier, DelegationTokenInformation>();
+
+  private final AtomicInteger masterKeySeq = new AtomicInteger();
+
+  @Override
+  public void setConf(Configuration conf) {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  @Override
+  public int addMasterKey(String s) {
+    int keySeq = masterKeySeq.getAndIncrement();
+    masterKeys.putIfAbsent(keySeq, s);
+    return keySeq;
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) {
+    masterKeys.put(keySeq, s);
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    return masterKeys.remove(keySeq) != null;
+  }
+
+  @Override
+  public String[] getMasterKeys() {
+    return masterKeys.values().toArray(new String[0]);
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+    DelegationTokenInformation token) {
+    DelegationTokenInformation tokenInfo = tokens.putIfAbsent(tokenIdentifier, token);
+    return (tokenInfo == null);
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+    DelegationTokenInformation tokenInfo = tokens.remove(tokenIdentifier);
+    return tokenInfo != null;
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+    return tokens.get(tokenIdentifier);
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+    List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+        tokens.size());
+    for (DelegationTokenIdentifier id : tokens.keySet()) {
+        result.add(id);
+    }
+    return result;
+  }
+
+}

Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java Wed Dec  7 02:31:00 2011
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of {@link DelegationTokenSecretManager} to support alternative to default in-memory
+ * token management for fail-over and clustering through plug-able token store (ZooKeeper etc.).
+ * Delegation tokens will be retrieved from the store on-demand and (unlike base class behavior) not
+ * cached in memory. This avoids complexities related to token expiration. The security token is
+ * needed only at the time the transport is opened (as opposed to per interface operation). The
+ * assumption therefore is low cost of interprocess token retrieval (for random read efficient store
+ * such as ZooKeeper) compared to overhead of synchronizing per-process in-memory token caches.
+ * The wrapper incorporates the token store abstraction within the limitations of current
+ * Hive/Hadoop dependency (.20S) with minimum code duplication.
+ * Eventually this should be supported by Hadoop security directly.
+ */
+public class TokenStoreDelegationTokenSecretManager extends DelegationTokenSecretManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TokenStoreDelegationTokenSecretManager.class.getName());
+
+  /**
+   * Exception for internal token store errors that typically cannot be handled by the caller.
+   */
+  public static class TokenStoreError extends RuntimeException {
+    private static final long serialVersionUID = -8693819817623074083L;
+
+    public TokenStoreError(Throwable cause) {
+      super(cause);
+    }
+
+    public TokenStoreError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Interface for pluggable token store that can be implemented as shared store with external
+   * storage (for example with ZooKeeper for HA).
+   * Internal, store specific errors are translated into {@link TokenStoreError}.
+   */
+  public static interface TokenStore extends Configurable {
+    /**
+     * Add new master key. The token store assigns and returns the sequence number.
+     * Caller needs to use the identifier to update the key (since it is embedded in the key).
+     *
+     * @param s
+     * @return sequence number for new key
+     */
+    int addMasterKey(String s) throws TokenStoreError;
+
+    void updateMasterKey(int keySeq, String s) throws TokenStoreError;
+
+    /**
+     * Remove key for given id.
+     * @param keySeq
+     * @return false if key no longer present, true otherwise.
+     */
+    boolean removeMasterKey(int keySeq);
+
+    String[] getMasterKeys() throws TokenStoreError;
+
+    /**
+     * Add token. If identifier is already present, token won't be added.
+     * @param tokenIdentifier
+     * @param token
+     * @return true if token was added, false for existing identifier
+     */
+    boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+        DelegationTokenInformation token) throws TokenStoreError;
+
+    /**
+     * Get token. Returns null if the token does not exist.
+     * @param tokenIdentifier
+     * @return
+     */
+    DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier)
+        throws TokenStoreError;
+
+    /**
+     * Remove token. Ignores token does not exist.
+     * @param tokenIdentifier
+     */
+    boolean removeToken(DelegationTokenIdentifier tokenIdentifier) throws TokenStoreError;
+
+    /**
+     * List of all token identifiers in the store. This is used to remove expired tokens
+     * and a potential scalability improvement would be to partition by master key id
+     * @return
+     */
+    List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers();
+
+  }
+
+  final private long keyUpdateInterval;
+  final private long tokenRemoverScanInterval;
+  private Thread tokenRemoverThread;
+
+  final private TokenStore tokenStore;
+
+  public TokenStoreDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval, TokenStore sharedStore) {
+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval,
+        delegationTokenRemoverScanInterval);
+    this.keyUpdateInterval = delegationKeyUpdateInterval;
+    this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
+
+    this.tokenStore = sharedStore;
+  }
+
+  protected DelegationTokenIdentifier getTokenIdentifier(Token<DelegationTokenIdentifier> token)
+      throws IOException {
+    // turn bytes back into identifier for cache lookup
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = createIdentifier();
+    id.readFields(in);
+    return id;
+  }
+
+  protected Map<Integer, DelegationKey> reloadKeys() {
+    // read keys from token store
+    String[] allKeys = tokenStore.getMasterKeys();
+    Map<Integer, DelegationKey> keys
+        = new java.util.HashMap<Integer, DelegationKey>(allKeys.length);
+    for (String keyStr : allKeys) {
+      DelegationKey key = new DelegationKey();
+      try {
+        decodeWritable(key, keyStr);
+        keys.put(key.getKeyId(), key);
+      } catch (IOException ex) {
+        LOGGER.error("Failed to load master key.", ex);
+      }
+    }
+    synchronized (this) {
+        super.allKeys.clear();
+        super.allKeys.putAll(keys);
+    }
+    return keys;
+  }
+
+  @Override
+  public byte[] retrievePassword(DelegationTokenIdentifier identifier)
+      throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+      DelegationTokenInformation info = this.tokenStore.getToken(identifier);
+      if (info == null) {
+          throw new InvalidToken("token expired or does not exist: " + identifier);
+      }
+      // must reuse super as info.getPassword is not accessible
+      synchronized (this) {
+        try {
+          super.currentTokens.put(identifier, info);
+          return super.retrievePassword(identifier);
+        } finally {
+          super.currentTokens.remove(identifier);
+        }
+      }
+  }
+
+  @Override
+  public DelegationTokenIdentifier cancelToken(Token<DelegationTokenIdentifier> token,
+      String canceller) throws IOException {
+    DelegationTokenIdentifier id = getTokenIdentifier(token);
+    LOGGER.info("Token cancelation requested for identifier: "+id);
+    this.tokenStore.removeToken(id);
+    return id;
+  }
+
+  /**
+   * Create the password and add it to shared store.
+   */
+  @Override
+  protected byte[] createPassword(DelegationTokenIdentifier id) {
+    byte[] password;
+    DelegationTokenInformation info;
+    synchronized (this) {
+      password = super.createPassword(id);
+      // add new token to shared store
+      // need to persist expiration along with password
+      info = super.currentTokens.remove(id);
+      if (info == null) {
+        throw new IllegalStateException("Failed to retrieve token after creation");
+      }
+    }
+    this.tokenStore.addToken(id, info);
+    return password;
+  }
+
+  @Override
+  public long renewToken(Token<DelegationTokenIdentifier> token,
+      String renewer) throws InvalidToken, IOException {
+    // since renewal is KERBEROS authenticated token may not be cached
+    final DelegationTokenIdentifier id = getTokenIdentifier(token);
+    DelegationTokenInformation tokenInfo = this.tokenStore.getToken(id);
+    if (tokenInfo == null) {
+        throw new InvalidToken("token does not exist: " + id); // no token found
+    }
+    // ensure associated master key is available
+    if (!super.allKeys.containsKey(id.getMasterKeyId())) {
+      LOGGER.info("Unknown master key (id={}), (re)loading keys from token store.",
+        id.getMasterKeyId());
+      reloadKeys();
+    }
+    // reuse super renewal logic
+    synchronized (this) {
+      super.currentTokens.put(id,  tokenInfo);
+      try {
+        return super.renewToken(token, renewer);
+      } finally {
+        super.currentTokens.remove(id);
+      }
+    }
+  }
+
+  public static String encodeWritable(Writable key) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    key.write(dos);
+    dos.flush();
+    return Base64.encodeBase64URLSafeString(bos.toByteArray());
+  }
+
+  public static void decodeWritable(Writable w, String idStr) throws IOException {
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(Base64.decodeBase64(idStr)));
+    w.readFields(in);
+  }
+
+  /**
+   * Synchronize master key updates / sequence generation for multiple nodes.
+   * NOTE: {@Link AbstractDelegationTokenSecretManager} keeps currentKey private, so we need
+   * to utilize this "hook" to manipulate the key through the object reference.
+   * This .20S workaround should cease to exist when Hadoop supports token store.
+   */
+  @Override
+  protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+    int keySeq = this.tokenStore.addMasterKey(encodeWritable(key));
+    // update key with assigned identifier
+    DelegationKey keyWithSeq = new DelegationKey(keySeq, key.getExpiryDate(), key.getKey());
+    String keyStr = encodeWritable(keyWithSeq);
+    this.tokenStore.updateMasterKey(keySeq, keyStr);
+    decodeWritable(key, keyStr);
+    LOGGER.info("New master key with key id={}", key.getKeyId());
+    super.logUpdateMasterKey(key);
+  }
+
+  @Override
+  public synchronized void startThreads() throws IOException {
+    try {
+      // updateCurrentKey needs to be called to initialize the master key
+      // (there should be a null check added in the future in rollMasterKey)
+      // updateCurrentKey();
+      Method m = AbstractDelegationTokenSecretManager.class.getDeclaredMethod("updateCurrentKey");
+      m.setAccessible(true);
+      m.invoke(this);
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize master key", e);
+    }
+    running = true;
+    tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
+    tokenRemoverThread.start();
+  }
+
+  @Override
+  public synchronized void stopThreads() {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Stopping expired delegation token remover thread");
+    }
+    running = false;
+    if (tokenRemoverThread != null) {
+      tokenRemoverThread.interrupt();
+    }
+  }
+
+  /**
+   * Remove expired tokens. Replaces logic in {@link AbstractDelegationTokenSecretManager}
+   * that cannot be reused due to private method access. Logic here can more efficiently
+   * deal with external token store by only loading into memory the minimum data needed.
+   */
+  protected void removeExpiredTokens() {
+    long now = System.currentTimeMillis();
+    Iterator<DelegationTokenIdentifier> i = tokenStore.getAllDelegationTokenIdentifiers()
+        .iterator();
+    while (i.hasNext()) {
+      DelegationTokenIdentifier id = i.next();
+      if (now > id.getMaxDate()) {
+        this.tokenStore.removeToken(id); // no need to look at token info
+      } else {
+        // get token info to check renew date
+        DelegationTokenInformation tokenInfo = tokenStore.getToken(id);
+        if (tokenInfo != null) {
+          if (now > tokenInfo.getRenewDate()) {
+            this.tokenStore.removeToken(id);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Extension of rollMasterKey to remove expired keys from store.
+   * @throws IOException
+   */
+  protected void rollMasterKeyExt() throws IOException {
+    Map<Integer, DelegationKey> keys = reloadKeys();
+    int currentKeyId = super.currentId;
+    HiveDelegationTokenSupport.rollMasterKey(TokenStoreDelegationTokenSecretManager.this);
+    List<DelegationKey> keysAfterRoll = Arrays.asList(getAllKeys());
+    for (DelegationKey key : keysAfterRoll) {
+        keys.remove(key.getKeyId());
+        if (key.getKeyId() == currentKeyId) {
+          tokenStore.updateMasterKey(currentKeyId, encodeWritable(key));
+        }
+    }
+    for (DelegationKey expiredKey : keys.values()) {
+      LOGGER.info("Removing expired key id={}", expiredKey.getKeyId());
+      tokenStore.removeMasterKey(expiredKey.getKeyId());
+    }
+  }
+
+
+  /**
+   * Cloned from {@link AbstractDelegationTokenSecretManager} to deal with private access
+   * restriction (there would not be an need to clone the remove thread if the remove logic was
+   * protected/extensible).
+   */
+  protected class ExpiredTokenRemover extends Thread {
+    private long lastMasterKeyUpdate;
+    private long lastTokenCacheCleanup;
+
+    @Override
+    public void run() {
+      LOGGER.info("Starting expired delegation token remover thread, "
+          + "tokenRemoverScanInterval=" + tokenRemoverScanInterval
+          / (60 * 1000) + " min(s)");
+      try {
+        while (running) {
+          long now = System.currentTimeMillis();
+          if (lastMasterKeyUpdate + keyUpdateInterval < now) {
+            try {
+              rollMasterKeyExt();
+              lastMasterKeyUpdate = now;
+            } catch (IOException e) {
+              LOGGER.error("Master key updating failed. "
+                  + StringUtils.stringifyException(e));
+            }
+          }
+          if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
+            removeExpiredTokens();
+            lastTokenCacheCleanup = now;
+          }
+          try {
+            Thread.sleep(5000); // 5 seconds
+          } catch (InterruptedException ie) {
+            LOGGER
+            .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "
+                + ie);
+          }
+        }
+      } catch (Throwable t) {
+        LOGGER.error("ExpiredTokenRemover thread received unexpected exception. "
+            + t, t);
+        Runtime.getRuntime().exit(-1);
+      }
+    }
+  }
+
+}

Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/thrift/ZooKeeperTokenStore.java Wed Dec  7 02:31:00 2011
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.thrift;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStoreError;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.HiveDelegationTokenSupport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper token store implementation.
+ */
+public class ZooKeeperTokenStore implements TokenStoreDelegationTokenSecretManager.TokenStore {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
+
+  private static final String ZK_SEQ_FORMAT = "%010d";
+  private static final String NODE_KEYS = "/keys";
+  private static final String NODE_TOKENS = "/tokens";
+
+  private String rootNode = "";
+  private volatile ZooKeeper zkSession;
+  private String zkConnectString;
+  private final int zkSessionTimeout = 3000;
+
+  private class ZooKeeperWatcher implements Watcher {
+    public void process(org.apache.zookeeper.WatchedEvent event) {
+      LOGGER.info(event.toString());
+      if (event.getState() == Watcher.Event.KeeperState.Expired) {
+        LOGGER.warn("ZooKeeper session expired, discarding connection");
+        try {
+          zkSession.close();
+        } catch (Throwable e) {
+          LOGGER.warn("Failed to close connection on expired session", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Default constructor for dynamic instantiation w/ Configurable
+   * (ReflectionUtils does not support Configuration constructor injection).
+   */
+  protected ZooKeeperTokenStore() {
+  }
+
+  public ZooKeeperTokenStore(String hostPort) {
+    this.zkConnectString = hostPort;
+    init();
+  }
+
+  private ZooKeeper getSession() {
+    if (zkSession == null || zkSession.getState() == States.CLOSED) {
+        synchronized (this) {
+          if (zkSession == null || zkSession.getState() == States.CLOSED) {
+            try {
+            zkSession = new ZooKeeper(this.zkConnectString, this.zkSessionTimeout,
+                new ZooKeeperWatcher());
+            } catch (IOException ex) {
+              throw new TokenStoreError("Token store error.", ex);
+            }
+          }
+        }
+    }
+    return zkSession;
+  }
+
+  private static String ensurePath(ZooKeeper zk, String path) throws KeeperException,
+      InterruptedException {
+    String[] pathComps = StringUtils.splitByWholeSeparator(path, "/");
+    String currentPath = "";
+    for (String pathComp : pathComps) {
+      currentPath += "/" + pathComp;
+      try {
+        String node = zk.create(currentPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+        LOGGER.info("Created path: " + node);
+      } catch (KeeperException.NodeExistsException e) {
+      }
+    }
+    return currentPath;
+  }
+
+  private void init() {
+    if (this.zkConnectString == null) {
+      throw new IllegalStateException("Not initialized");
+    }
+
+    if (this.zkSession != null) {
+      try {
+        this.zkSession.close();
+      } catch (InterruptedException ex) {
+        LOGGER.warn("Failed to close existing session.", ex);
+      }
+    }
+
+    ZooKeeper zk = getSession();
+    try {
+        ensurePath(zk, rootNode + NODE_KEYS);
+        ensurePath(zk, rootNode + NODE_TOKENS);
+      } catch (Exception e) {
+        throw new TokenStoreError("Failed to validate token path.", e);
+      }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf != null) {
+      this.zkConnectString = conf.get(
+          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null);
+      this.rootNode = conf.get(
+          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE,
+          HadoopThriftAuthBridge20S.Server.DELEGATION_TOKEN_STORE_ZK_ROOT_NODE_DEFAULT);
+    }
+    init();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null; // not required
+  }
+
+  private Map<Integer, byte[]> getAllKeys() throws KeeperException,
+      InterruptedException {
+
+    String masterKeyNode = rootNode + NODE_KEYS;
+    ZooKeeper zk = getSession();
+    List<String> nodes = zk.getChildren(masterKeyNode, false);
+    Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
+    for (String node : nodes) {
+      byte[] data = zk.getData(masterKeyNode + "/" + node, false, null);
+      if (data != null) {
+        result.put(getSeq(node), data);
+      }
+    }
+    return result;
+  }
+
+  private int getSeq(String path) {
+    String[] pathComps = path.split("/");
+    return Integer.parseInt(pathComps[pathComps.length-1]);
+  }
+
+  @Override
+  public int addMasterKey(String s) {
+    try {
+      ZooKeeper zk = getSession();
+      String newNode = zk.create(rootNode + NODE_KEYS + "/", s.getBytes(), Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT_SEQUENTIAL);
+      LOGGER.info("Added key {}", newNode);
+      return getSeq(newNode);
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) {
+    try {
+      ZooKeeper zk = getSession();
+      zk.setData(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), s.getBytes(),
+          -1);
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    try {
+      ZooKeeper zk = getSession();
+      zk.delete(rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq), -1);
+      return true;
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public String[] getMasterKeys() {
+    try {
+      Map<Integer, byte[]> allKeys = getAllKeys();
+      String[] result = new String[allKeys.size()];
+      int resultIdx = 0;
+      for (byte[] keyBytes : allKeys.values()) {
+          result[resultIdx++] = new String(keyBytes);
+      }
+      return result;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+
+  private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) {
+    try {
+      return rootNode + NODE_TOKENS + "/"
+          + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
+    } catch (IOException ex) {
+      throw new TokenStoreError("Failed to encode token identifier", ex);
+    }
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+      DelegationTokenInformation token) {
+    try {
+      ZooKeeper zk = getSession();
+      byte[] tokenBytes = HiveDelegationTokenSupport.encodeDelegationTokenInformation(token);
+      String newNode = zk.create(getTokenPath(tokenIdentifier),
+          tokenBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      LOGGER.info("Added token: {}", newNode);
+      return true;
+    } catch (KeeperException.NodeExistsException ex) {
+      return false;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+    try {
+      ZooKeeper zk = getSession();
+      zk.delete(getTokenPath(tokenIdentifier), -1);
+      return true;
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) {
+    try {
+      ZooKeeper zk = getSession();
+      byte[] tokenBytes = zk.getData(getTokenPath(tokenIdentifier), false, null);
+      try {
+        return HiveDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+      } catch (Exception ex) {
+        throw new TokenStoreError("Failed to decode token", ex);
+      }
+    } catch (KeeperException.NoNodeException ex) {
+      return null;
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+    String containerNode = rootNode + NODE_TOKENS;
+    final List<String> nodes;
+    try  {
+      nodes = getSession().getChildren(containerNode, false);
+    } catch (KeeperException ex) {
+      throw new TokenStoreError(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreError(ex);
+    }
+    List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>(
+        nodes.size());
+    for (String node : nodes) {
+      DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+      try {
+        TokenStoreDelegationTokenSecretManager.decodeWritable(id, node);
+        result.add(id);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to decode token '{}'", node);
+      }
+    }
+    return result;
+  }
+
+}

Added: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java?rev=1211275&view=auto
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java (added)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.java Wed Dec  7 02:31:00 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+
+/**
+ * Workaround for serialization of {@link DelegationTokenInformation} through package access.
+ * Future version of Hadoop should add this to DelegationTokenInformation itself.
+ */
+public final class HiveDelegationTokenSupport {
+
+  private HiveDelegationTokenSupport() {}
+
+  public static byte[] encodeDelegationTokenInformation(DelegationTokenInformation token) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream out = new DataOutputStream(bos);
+      WritableUtils.writeVInt(out, token.password.length);
+      out.write(token.password);
+      out.writeLong(token.renewDate);
+      out.flush();
+      return bos.toByteArray();
+    } catch (IOException ex) {
+      throw new RuntimeException("Failed to encode token.", ex);
+    }
+  }
+
+  public static DelegationTokenInformation decodeDelegationTokenInformation(byte[] tokenBytes)
+      throws IOException {
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenBytes));
+    DelegationTokenInformation token = new DelegationTokenInformation(0, null);
+    int len = WritableUtils.readVInt(in);
+    token.password = new byte[len];
+    in.readFully(token.password);
+    token.renewDate = in.readLong();
+    return token;
+  }
+
+  public static void rollMasterKey(
+      AbstractDelegationTokenSecretManager<? extends AbstractDelegationTokenIdentifier> mgr)
+      throws IOException {
+    mgr.rollMasterKey();
+  }
+
+}

Modified: hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java?rev=1211275&r1=1211274&r2=1211275&view=diff
==============================================================================
--- hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java (original)
+++ hive/trunk/shims/src/test/org/apache/hadoop/hive/thrift/TestHadoop20SAuthBridge.java Wed Dec  7 02:31:00 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.TokenStoreDelegationTokenSecretManager.TokenStore;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -46,7 +47,10 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.thrift.transport.TSaslServerTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -78,12 +82,19 @@ public class TestHadoop20SAuthBridge ext
 
         return new TUGIAssumingTransportFactory(transFactory, realUgi);
       }
+      static TokenStore TOKEN_STORE = new MemoryTokenStore();
+      //static TokenStore TOKEN_STORE = new ZooKeeperTokenStore("localhost:2181");
+
+      @Override
+      protected TokenStore getTokenStore(Configuration conf) throws IOException {
+        return TOKEN_STORE;
+      }
     }
   }
-  
+
 
   private HiveConf conf;
-  
+
   private void configureSuperUserIPAddresses(Configuration conf,
       String superUserShortName) throws IOException {
     ArrayList<String> ipList = new ArrayList<String>();
@@ -107,7 +118,7 @@ public class TestHadoop20SAuthBridge ext
     conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
         builder.toString());
   }
-  
+
   public void setup(final int port) throws Exception {
     System.setProperty(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname,
         "true");
@@ -120,28 +131,105 @@ public class TestHadoop20SAuthBridge ext
     MetaStoreUtils.startMetaStore(port, new MyHadoopThriftAuthBridge20S());
   }
 
+  /**
+   * Test delegation token store/load from shared store.
+   * @throws Exception
+   */
+  public void testDelegationTokenSharedStore() throws Exception {
+    UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser();
+
+    TokenStoreDelegationTokenSecretManager tokenManager =
+        new TokenStoreDelegationTokenSecretManager(0, 60*60*1000, 60*60*1000, 0,
+            MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE);
+    // initializes current key
+    tokenManager.startThreads();
+    tokenManager.stopThreads();
+
+    String tokenStrForm = tokenManager.getDelegationToken(clientUgi.getShortUserName());
+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+
+    //check whether the username in the token is what we expect
+    DelegationTokenIdentifier d = new DelegationTokenIdentifier();
+    d.readFields(new DataInputStream(new ByteArrayInputStream(
+        t.getIdentifier())));
+    assertTrue("Usernames don't match",
+        clientUgi.getShortUserName().equals(d.getUser().getShortUserName()));
+
+    DelegationTokenInformation tokenInfo = MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE
+        .getToken(d);
+    assertNotNull("token not in store", tokenInfo);
+    assertFalse("duplicate token add",
+        MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.addToken(d, tokenInfo));
+
+    // check keys are copied from token store when token is loaded
+    TokenStoreDelegationTokenSecretManager anotherManager =
+        new TokenStoreDelegationTokenSecretManager(0, 0, 0, 0,
+            MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE);
+   assertEquals("master keys empty on init", 0,
+        anotherManager.getAllKeys().length);
+    assertNotNull("token loaded",
+        anotherManager.retrievePassword(d));
+    anotherManager.renewToken(t, clientUgi.getShortUserName());
+    assertEquals("master keys not loaded from store",
+        MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getMasterKeys().length,
+        anotherManager.getAllKeys().length);
+
+    // cancel the delegation token
+    tokenManager.cancelDelegationToken(tokenStrForm);
+    assertNull("token not removed from store after cancel",
+        MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d));
+    assertFalse("token removed (again)",
+        MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.removeToken(d));
+    try {
+      anotherManager.retrievePassword(d);
+      fail("InvalidToken expected after cancel");
+    } catch (InvalidToken ex) {
+      // expected
+    }
+
+    // token expiration
+    MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.addToken(d,
+        new DelegationTokenInformation(0, t.getPassword()));
+    assertNotNull(MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d));
+    anotherManager.removeExpiredTokens();
+    assertNull("Expired token not removed",
+        MyHadoopThriftAuthBridge20S.Server.TOKEN_STORE.getToken(d));
+
+    // key expiration - create an already expired key
+    anotherManager.startThreads(); // generates initial key
+    anotherManager.stopThreads();
+    DelegationKey expiredKey = new DelegationKey(-1, 0, anotherManager.getAllKeys()[0].getKey());
+    anotherManager.logUpdateMasterKey(expiredKey); // updates key with sequence number
+    assertTrue("expired key not in allKeys",
+        anotherManager.reloadKeys().containsKey(expiredKey.getKeyId()));
+    anotherManager.rollMasterKeyExt();
+    assertFalse("Expired key not removed",
+        anotherManager.reloadKeys().containsKey(expiredKey.getKeyId()));
+  }
+
   public void testSaslWithHiveMetaStore() throws Exception {
     setup(10000);
     UserGroupInformation clientUgi = UserGroupInformation.getCurrentUser();
     obtainTokenAndAddIntoUGI(clientUgi, null);
     obtainTokenAndAddIntoUGI(clientUgi, "tokenForFooTablePartition");
   }
-  
+
   public void testMetastoreProxyUser() throws Exception {
     setup(10010);
-    
+
     final String proxyUserName = "proxyUser";
-    //set the configuration up such that proxyUser can act on 
+    //set the configuration up such that proxyUser can act on
     //behalf of all users belonging to the group foo_bar_group (
     //a dummy group)
     String[] groupNames =
       new String[] { "foo_bar_group" };
     setGroupsInConf(groupNames, proxyUserName);
-    
-    final UserGroupInformation delegationTokenUser = 
+
+    final UserGroupInformation delegationTokenUser =
       UserGroupInformation.getCurrentUser();
-        
-    final UserGroupInformation proxyUserUgi = 
+
+    final UserGroupInformation proxyUserUgi =
       UserGroupInformation.createRemoteUser(proxyUserName);
     String tokenStrForm = proxyUserUgi.doAs(new PrivilegedExceptionAction<String>() {
       public String run() throws Exception {
@@ -154,11 +242,11 @@ public class TestHadoop20SAuthBridge ext
         }
       }
     });
-    assertTrue("Expected the getDelegationToken call to fail", 
+    assertTrue("Expected the getDelegationToken call to fail",
         tokenStrForm == null);
-    
-    //set the configuration up such that proxyUser can act on 
-    //behalf of all users belonging to the real group(s) that the 
+
+    //set the configuration up such that proxyUser can act on
+    //behalf of all users belonging to the real group(s) that the
     //user running the test belongs to
     setGroupsInConf(UserGroupInformation.getCurrentUser().getGroupNames(),
         proxyUserName);
@@ -173,7 +261,7 @@ public class TestHadoop20SAuthBridge ext
         }
       }
     });
-    assertTrue("Expected the getDelegationToken call to not fail", 
+    assertTrue("Expected the getDelegationToken call to not fail",
         tokenStrForm != null);
     Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
     t.decodeFromUrlString(tokenStrForm);
@@ -181,12 +269,12 @@ public class TestHadoop20SAuthBridge ext
     DelegationTokenIdentifier d = new DelegationTokenIdentifier();
     d.readFields(new DataInputStream(new ByteArrayInputStream(
         t.getIdentifier())));
-    assertTrue("Usernames don't match", 
+    assertTrue("Usernames don't match",
         delegationTokenUser.getShortUserName().equals(d.getUser().getShortUserName()));
-    
+
   }
-  
-  private void setGroupsInConf(String[] groupNames, String proxyUserName) 
+
+  private void setGroupsInConf(String[] groupNames, String proxyUserName)
   throws IOException {
    conf.set(
       ProxyUsers.getProxySuperuserGroupConfKey(proxyUserName),
@@ -194,21 +282,21 @@ public class TestHadoop20SAuthBridge ext
     configureSuperUserIPAddresses(conf, proxyUserName);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
   }
-  
-  private String getDelegationTokenStr(UserGroupInformation ownerUgi, 
+
+  private String getDelegationTokenStr(UserGroupInformation ownerUgi,
       UserGroupInformation realUgi) throws Exception {
     //obtain a token by directly invoking the metastore operation(without going
     //through the thrift interface). Obtaining a token makes the secret manager
     //aware of the user and that it gave the token to the user
-    //also set the authentication method explicitly to KERBEROS. Since the 
+    //also set the authentication method explicitly to KERBEROS. Since the
     //metastore checks whether the authentication method is KERBEROS or not
-    //for getDelegationToken, and the testcases don't use 
+    //for getDelegationToken, and the testcases don't use
     //kerberos, this needs to be done
     HadoopThriftAuthBridge20S.Server.authenticationMethod
                              .set(AuthenticationMethod.KERBEROS);
-    HadoopThriftAuthBridge20S.Server.remoteAddress.set(InetAddress.getLocalHost()); 
+    HadoopThriftAuthBridge20S.Server.remoteAddress.set(InetAddress.getLocalHost());
     return
-        HiveMetaStore.getDelegationToken(ownerUgi.getShortUserName(), 
+        HiveMetaStore.getDelegationToken(ownerUgi.getShortUserName(),
             realUgi.getShortUserName());
   }
 
@@ -217,14 +305,14 @@ public class TestHadoop20SAuthBridge ext
     String tokenStrForm = getDelegationTokenStr(clientUgi, clientUgi);
     Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
     t.decodeFromUrlString(tokenStrForm);
-    
+
     //check whether the username in the token is what we expect
     DelegationTokenIdentifier d = new DelegationTokenIdentifier();
     d.readFields(new DataInputStream(new ByteArrayInputStream(
         t.getIdentifier())));
-    assertTrue("Usernames don't match", 
+    assertTrue("Usernames don't match",
         clientUgi.getShortUserName().equals(d.getUser().getShortUserName()));
-    
+
     if (tokenSig != null) {
       conf.set("hive.metastore.token.signature", tokenSig);
       t.setService(new Text(tokenSig));
@@ -247,7 +335,7 @@ public class TestHadoop20SAuthBridge ext
 
     //try out some metastore operations
     createDBAndVerifyExistence(hiveClient);
-    
+
     //check that getDelegationToken fails since we are not authenticating
     //over kerberos
     boolean pass = false;