You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:55:09 UTC

svn commit: r1077240 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/security/token/delegation/ hdfs/org/apache/hadoop/hdfs/protocol/ hdfs/org/apache/hadoop/hdfs/security/token/delegation/ hdfs/org/apache/hadoop/hd...

Author: omalley
Date: Fri Mar  4 03:55:08 2011
New Revision: 1077240

URL: http://svn.apache.org/viewvc?rev=1077240&view=rev
Log:
commit 2c530f597af46712175daf7e3db822a87b615b2f
Author: Jitendra Nath Pandey <jitendra@sufferhome-lm.(none)>
Date:   Fri Feb 26 21:22:54 2010 -0800

    HADOOP-6573, HDFS-984, MR-1537 from https://issues.apache.org/jira/secure/attachment/12437292/HDFS-984-0_20.4.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HDFS-984,HADOOP-6573,MAPREDUCE-1537. Delegation Tokens should be persisted in Namenode,
    +    and corresponding changes in common and mr. (jitendra)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationToken.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/token/delegation/TestDelegationToken.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java Fri Mar  4 03:55:08 2011
@@ -51,23 +51,30 @@ extends AbstractDelegationTokenIdentifie
 
   /** 
    * Cache of currently valid tokens, mapping from DelegationTokenIdentifier 
-   * to DelegationTokenInformation. Protected by its own lock.
+   * to DelegationTokenInformation. Protected by this object lock.
    */
-  private final Map<TokenIdent, DelegationTokenInformation> currentTokens 
+  protected final Map<TokenIdent, DelegationTokenInformation> currentTokens 
       = new HashMap<TokenIdent, DelegationTokenInformation>();
   
   /**
-   * Sequence number to create DelegationTokenIdentifier
+   * Sequence number to create DelegationTokenIdentifier.
+   * Protected by this object lock.
    */
-  private int delegationTokenSequenceNumber = 0;
+  protected int delegationTokenSequenceNumber = 0;
   
-  private final Map<Integer, DelegationKey> allKeys 
+  /**
+   * Access to allKeys is protected by this object lock
+   */
+  protected final Map<Integer, DelegationKey> allKeys 
       = new HashMap<Integer, DelegationKey>();
   
   /**
-   * Access to currentId and currentKey is protected by this object lock.
+   * Access to currentId is protected by this object lock.
+   */
+  protected int currentId = 0;
+  /**
+   * Access to currentKey is protected by this object lock
    */
-  private int currentId = 0;
   private DelegationKey currentKey;
   
   private long keyUpdateInterval;
@@ -75,7 +82,7 @@ extends AbstractDelegationTokenIdentifie
   private long tokenRemoverScanInterval;
   private long tokenRenewInterval;
   private Thread tokenRemoverThread;
-  private volatile boolean running;
+  protected volatile boolean running;
 
   public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
@@ -111,27 +118,50 @@ extends AbstractDelegationTokenIdentifie
     return allKeys.values().toArray(new DelegationKey[0]);
   }
   
-  /** Update the current master key */
-  private synchronized void updateCurrentKey() throws IOException {
+  protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+    return;
+  }
+  
+  /** 
+   * Update the current master key 
+   * This is called once by startThreads before tokenRemoverThread is created, 
+   * and only by tokenRemoverThread afterwards.
+   */
+  private void updateCurrentKey() throws IOException {
     LOG.info("Updating the current master key for generating delegation tokens");
     /* Create a new currentKey with an estimated expiry date. */
-    currentId++;
-    currentKey = new DelegationKey(currentId, System.currentTimeMillis()
+    int newCurrentId;
+    synchronized (this) {
+      newCurrentId = currentId+1;
+    }
+    DelegationKey newKey = new DelegationKey(newCurrentId, System
+        .currentTimeMillis()
         + keyUpdateInterval + tokenMaxLifetime, generateSecret());
-    allKeys.put(currentKey.getKeyId(), currentKey);
+    //Log must be invoked outside the lock on 'this'
+    logUpdateMasterKey(newKey);
+    synchronized (this) {
+      currentId = newKey.getKeyId();
+      currentKey = newKey;
+      allKeys.put(currentKey.getKeyId(), currentKey);
+    }
   }
   
-  /** Update the current master key for generating delegation tokens */
-  public synchronized void rollMasterKey() throws IOException {
-    removeExpiredKeys();
-    /* set final expiry date for retiring currentKey */
-    currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
-    /*
-     * currentKey might have been removed by removeExpiredKeys(), if
-     * updateMasterKey() isn't called at expected interval. Add it back to
-     * allKeys just in case.
-     */
-    allKeys.put(currentKey.getKeyId(), currentKey);
+  /** 
+   * Update the current master key for generating delegation tokens 
+   * It should be called only by tokenRemoverThread.
+   */
+  void rollMasterKey() throws IOException {
+    synchronized (this) {
+      removeExpiredKeys();
+      /* set final expiry date for retiring currentKey */
+      currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
+      /*
+       * currentKey might have been removed by removeExpiredKeys(), if
+       * updateMasterKey() isn't called at expected interval. Add it back to
+       * allKeys just in case.
+       */
+      allKeys.put(currentKey.getKeyId(), currentKey);
+    }
     updateCurrentKey();
   }
 
@@ -147,35 +177,24 @@ extends AbstractDelegationTokenIdentifie
   }
   
   @Override
-  protected byte[] createPassword(TokenIdent identifier) {
+  protected synchronized byte[] createPassword(TokenIdent identifier) {
     int sequenceNum;
-    int id;
-    DelegationKey key;
-    long now = System.currentTimeMillis();    
-    synchronized (this) {
-      id = currentId;
-      key = currentKey;
-      sequenceNum = ++delegationTokenSequenceNumber;
-    }
+    long now = System.currentTimeMillis();
+    sequenceNum = ++delegationTokenSequenceNumber;
     identifier.setIssueDate(now);
     identifier.setMaxDate(now + tokenMaxLifetime);
-    identifier.setMasterKeyId(id);
+    identifier.setMasterKeyId(currentId);
     identifier.setSequenceNumber(sequenceNum);
-    byte[] password = createPassword(identifier.getBytes(), key.getKey());
-    synchronized (currentTokens) {
-      currentTokens.put(identifier, new DelegationTokenInformation(now
-          + tokenRenewInterval, password));
-    }
+    byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
+    currentTokens.put(identifier, new DelegationTokenInformation(now
+        + tokenRenewInterval, password));
     return password;
   }
 
   @Override
-  public byte[] retrievePassword(TokenIdent identifier
-                                 ) throws InvalidToken {
-    DelegationTokenInformation info = null;
-    synchronized (currentTokens) {
-      info = currentTokens.get(identifier);
-    }
+  public synchronized byte[] retrievePassword(TokenIdent identifier)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(identifier);
     if (info == null) {
       throw new InvalidToken("token is expired or doesn't exist");
     }
@@ -194,18 +213,14 @@ extends AbstractDelegationTokenIdentifie
    * @throws InvalidToken if the token is invalid
    * @throws AccessControlException if the user can't renew token
    */
-  public long renewToken(Token<TokenIdent> token,
+  public synchronized long renewToken(Token<TokenIdent> token,
                          String renewer) throws InvalidToken, IOException {
     long now = System.currentTimeMillis();
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
     DataInputStream in = new DataInputStream(buf);
     TokenIdent id = createIdentifier();
     id.readFields(in);
-    synchronized (currentTokens) {
-      if (currentTokens.get(id) == null) {
-        throw new InvalidToken("Renewal request for unknown token");
-      }
-    }
+
     if (id.getMaxDate() < now) {
       throw new InvalidToken("User " + renewer + 
                              " tried to renew an expired token");
@@ -221,36 +236,36 @@ extends AbstractDelegationTokenIdentifie
                                        "renewer specified as " + 
                                        id.getRenewer());
     }
-    DelegationKey key = null;
-    synchronized (this) {
-      key = allKeys.get(id.getMasterKeyId());
-    }
+    DelegationKey key = allKeys.get(id.getMasterKeyId());
     if (key == null) {
-      throw new InvalidToken("Unable to find master key for keyId=" + 
-                             id.getMasterKeyId() +
-                             " from cache. Failed to renew an unexpired token"+
-                             " with sequenceNumber=" + id.getSequenceNumber());
+      throw new InvalidToken("Unable to find master key for keyId="
+          + id.getMasterKeyId()
+          + " from cache. Failed to renew an unexpired token"
+          + " with sequenceNumber=" + id.getSequenceNumber());
     }
     byte[] password = createPassword(token.getIdentifier(), key.getKey());
     if (!Arrays.equals(password, token.getPassword())) {
-      throw new AccessControlException("Client " + renewer + 
-                                       " is trying to renew a token with " +
-                                       "wrong password");
+      throw new AccessControlException("Client " + renewer
+          + " is trying to renew a token with " + "wrong password");
     }
-    DelegationTokenInformation info = new DelegationTokenInformation(
-        Math.min(id.getMaxDate(), now + tokenRenewInterval), password);
-    synchronized (currentTokens) {
-      currentTokens.put(id, info);
+    long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
+    DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
+        password);
+
+    if (currentTokens.get(id) == null) {
+      throw new InvalidToken("Renewal request for unknown token");
     }
-    return info.getRenewDate();
+    currentTokens.put(id, info);
+    return renewTime;
   }
   
   /**
    * Cancel a token by removing it from cache.
+   * @return Identifier of the canceled token
    * @throws InvalidToken for invalid token
    * @throws AccessControlException if the user isn't allowed to cancel
    */
-  public void cancelToken(Token<TokenIdent> token,
+  public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
       String canceller) throws IOException {
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
     DataInputStream in = new DataInputStream(buf);
@@ -261,18 +276,17 @@ extends AbstractDelegationTokenIdentifie
     }
     String owner = id.getUser().getUserName();
     Text renewer = id.getRenewer();
-    if (!canceller.equals(owner) && 
-        (renewer == null || !canceller.equals(renewer.toString()))) {
-      throw new AccessControlException(canceller + 
-                                      " is not authorized to cancel the token");
+    if (!canceller.equals(owner)
+        && (renewer == null || !canceller.equals(renewer.toString()))) {
+      throw new AccessControlException(canceller
+          + " is not authorized to cancel the token");
     }
     DelegationTokenInformation info = null;
-    synchronized (currentTokens) {
-      info = currentTokens.remove(id);
-    }
+    info = currentTokens.remove(id);
     if (info == null) {
       throw new InvalidToken("Token not found");
     }
+    return id;
   }
   
   /**
@@ -284,16 +298,16 @@ extends AbstractDelegationTokenIdentifie
     return SecretManager.createSecretKey(key);
   }
 
-  /** Utility class to encapsulate a token's renew date and password. */
-  private static class DelegationTokenInformation {
+  /** Class to encapsulate a token's renew date and password. */
+  public static class DelegationTokenInformation {
     long renewDate;
     byte[] password;
-    DelegationTokenInformation(long renewDate, byte[] password) {
+    public DelegationTokenInformation(long renewDate, byte[] password) {
       this.renewDate = renewDate;
       this.password = password;
     }
     /** returns renew date */
-    long getRenewDate() {
+    public long getRenewDate() {
       return renewDate;
     }
     /** returns password */
@@ -303,15 +317,13 @@ extends AbstractDelegationTokenIdentifie
   }
   
   /** Remove expired delegation tokens from cache */
-  private void removeExpiredToken() {
+  private synchronized void removeExpiredToken() {
     long now = System.currentTimeMillis();
-    synchronized (currentTokens) {
-      Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
-      while (i.hasNext()) {
-        long renewDate = i.next().getRenewDate();
-        if (now > renewDate) {
-          i.remove();
-        }
+    Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
+    while (i.hasNext()) {
+      long renewDate = i.next().getRenewDate();
+      if (now > renewDate) {
+        i.remove();
       }
     }
   }
@@ -320,7 +332,9 @@ extends AbstractDelegationTokenIdentifie
     if (LOG.isDebugEnabled())
       LOG.debug("Stopping expired delegation token remover thread");
     running = false;
-    tokenRemoverThread.interrupt();
+    if (tokenRemoverThread != null) {
+      tokenRemoverThread.interrupt();
+    }
   }
   
   private class ExpiredTokenRemover extends Thread {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Mar  4 03:55:08 2011
@@ -81,7 +81,8 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -18;
+  public static final int LAYOUT_VERSION = -19;
   // Current version: 
-  // Support disk space quotas
+  // -19: added new OP_[GET|RENEW|CANCEL]_DELEGATION_TOKEN and
+  // OP_UPDATE_MASTER_KEY.
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java Fri Mar  4 03:55:08 2011
@@ -20,6 +20,15 @@ package org.apache.hadoop.hdfs.security.
 
 //import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 
 /**
  * A HDFS specific delegation token secret manager.
@@ -30,6 +39,10 @@ import org.apache.hadoop.security.token.
 public class DelegationTokenSecretManager
     extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
 
+  private static final Log LOG = LogFactory
+      .getLog(DelegationTokenSecretManager.class);
+  
+  private final FSNamesystem namesystem;
   /**
    * Create a secret manager
    * @param delegationKeyUpdateInterval the number of seconds for rolling new
@@ -41,16 +54,220 @@ public class DelegationTokenSecretManage
    *        for expired tokens
    */
   public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
-                                      long delegationTokenMaxLifetime, 
-                                      long delegationTokenRenewInterval,
-                                      long delegationTokenRemoverScanInterval) {
+      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
     super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
-          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+        delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+    this.namesystem = namesystem;
   }
 
-  @Override
+  @Override //SecretManager
   public DelegationTokenIdentifier createIdentifier() {
     return new DelegationTokenIdentifier();
   }
 
+  /**
+   * Returns expiry time of a token given its identifier.
+   * 
+   * @param dtId DelegationTokenIdentifier of a token
+   * @return Expiry time of the token
+   * @throws IOException
+   */
+  public synchronized long getTokenExpiryTime(
+      DelegationTokenIdentifier dtId) throws IOException {
+    DelegationTokenInformation info = currentTokens.get(dtId);
+    if (info != null) {
+      return info.getRenewDate();
+    } else {
+      throw new IOException("No delegation token found for this identifier");
+    }
+  }
+  
+  /**
+   * Load SecretManager state from fsimage.
+   * 
+   * @param in input stream to read fsimage
+   * @throws IOException
+   */
+  public synchronized void loadSecretManagerState(DataInputStream in)
+      throws IOException {
+    if (running) {
+      // a safety check
+      throw new IOException(
+          "Can't load state from image in a running SecretManager.");
+    }
+    currentId = in.readInt();
+    loadAllKeys(in);
+    delegationTokenSequenceNumber = in.readInt();
+    loadCurrentTokens(in);
+  }
+  
+  /**
+   * Store the current state of the SecretManager for persistence
+   * 
+   * @param out Output stream for writing into fsimage.
+   * @throws IOException
+   */
+  public synchronized void saveSecretManagerState(DataOutputStream out)
+      throws IOException {
+    out.writeInt(currentId);
+    saveAllKeys(out);
+    out.writeInt(delegationTokenSequenceNumber);
+    saveCurrentTokens(out);
+  }
+  
+  /**
+   * This method is intended to be used only while reading edit logs.
+   * 
+   * @param identifier DelegationTokenIdentifier read from the edit logs or
+   * fsimage
+   * 
+   * @param expiryTime token expiry time
+   * @throws IOException
+   */
+  public synchronized void addPersistedDelegationToken(
+      DelegationTokenIdentifier identifier, long expiryTime) throws IOException {
+    if (running) {
+      // a safety check
+      throw new IOException(
+          "Can't add persisted delegation token to a running SecretManager.");
+    }
+    int keyId = identifier.getMasterKeyId();
+    DelegationKey dKey = allKeys.get(keyId);
+    if (dKey == null) {
+      LOG
+          .warn("No KEY found for persisted identifier "
+              + identifier.toString());
+      return;
+    }
+    byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
+    if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) {
+      this.delegationTokenSequenceNumber = identifier.getSequenceNumber();
+    }
+    if (currentTokens.get(identifier) == null) {
+      currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
+          password));
+    } else {
+      throw new IOException(
+          "Same delegation token being added twice; invalid entry in fsimage or editlogs");
+    }
+  }
+
+  /**
+   * Add a MasterKey to the list of keys.
+   * 
+   * @param key DelegationKey
+   * @throws IOException
+   */
+  public synchronized void updatePersistedMasterKey(DelegationKey key)
+      throws IOException {
+    addKey(key);
+  }
+  
+  /**
+   * Update the token cache with renewal record in edit logs.
+   * 
+   * @param identifier DelegationTokenIdentifier of the renewed token
+   * @param expiryTime
+   * @throws IOException
+   */
+  public synchronized void updatePersistedTokenRenewal(
+      DelegationTokenIdentifier identifier, long expiryTime) throws IOException {
+    if (running) {
+      // a safety check
+      throw new IOException(
+          "Can't update persisted delegation token renewal to a running SecretManager.");
+    }
+    DelegationTokenInformation info = null;
+    info = currentTokens.get(identifier);
+    if (info != null) {
+      int keyId = identifier.getMasterKeyId();
+      byte[] password = createPassword(identifier.getBytes(), allKeys
+          .get(keyId).getKey());
+      currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
+          password));
+    }
+  }
+
+  /**
+   *  Update the token cache with the cancel record in edit logs
+   *  
+   *  @param identifier DelegationTokenIdentifier of the canceled token
+   *  @throws IOException
+   */
+  public synchronized void updatePersistedTokenCancellation(
+      DelegationTokenIdentifier identifier) throws IOException {
+    if (running) {
+      // a safety check
+      throw new IOException(
+          "Can't update persisted delegation token renewal to a running SecretManager.");
+    }
+    currentTokens.remove(identifier);
+  }
+
+  /**
+   * Private helper methods to save delegation keys and tokens in fsimage
+   */
+  private synchronized void saveCurrentTokens(DataOutputStream out)
+      throws IOException {
+    out.writeInt(currentTokens.size());
+    Iterator<DelegationTokenIdentifier> iter = currentTokens.keySet()
+        .iterator();
+    while (iter.hasNext()) {
+      DelegationTokenIdentifier id = iter.next();
+      id.write(out);
+      DelegationTokenInformation info = currentTokens.get(id);
+      out.writeLong(info.getRenewDate());
+    }
+  }
+  
+  /*
+   * Save the current state of allKeys
+   */
+  private synchronized void saveAllKeys(DataOutputStream out)
+      throws IOException {
+    out.writeInt(allKeys.size());
+    Iterator<Integer> iter = allKeys.keySet().iterator();
+    while (iter.hasNext()) {
+      Integer key = iter.next();
+      allKeys.get(key).write(out);
+    }
+  }
+  
+  /**
+   * Private helper methods to load Delegation tokens from fsimage
+   */
+  private synchronized void loadCurrentTokens(DataInputStream in)
+      throws IOException {
+    int numberOfTokens = in.readInt();
+    for (int i = 0; i < numberOfTokens; i++) {
+      DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+      id.readFields(in);
+      long expiryTime = in.readLong();
+      addPersistedDelegationToken(id, expiryTime);
+    }
+  }
+
+  /**
+   * Private helper method to load delegation keys from fsimage.
+   * @param in
+   * @throws IOException
+   */
+  private synchronized void loadAllKeys(DataInputStream in) throws IOException {
+    int numberOfKeys = in.readInt();
+    for (int i = 0; i < numberOfKeys; i++) {
+      DelegationKey value = new DelegationKey();
+      value.readFields(in);
+      addKey(value);
+    }
+  }
+
+  /**
+   * Call namesystem to update editlogs for new master key.
+   */
+  @Override //AbstractDelegationTokenManager
+  protected void logUpdateMasterKey(DelegationKey key)
+      throws IOException {
+    namesystem.logUpdateMasterKey(key);
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Mar  4 03:55:08 2011
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -68,6 +70,11 @@ public class FSEditLog {
   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+  private static final byte OP_GET_DELEGATION_TOKEN = 15; //new delegation token
+  private static final byte OP_RENEW_DELEGATION_TOKEN = 16; //renew delegation token
+  private static final byte OP_CANCEL_DELEGATION_TOKEN = 17; //cancel delegation token
+  private static final byte OP_UPDATE_MASTER_KEY = 18; //update master key
+
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -488,7 +495,13 @@ public class FSEditLog {
     int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
         numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0,
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
-        numOpTimes = 0, numOpOther = 0;
+        numOpTimes = 0, numOpGetDelegationToken = 0,
+        numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0,
+        numOpUpdateMasterKey = 0, numOpOther = 0;
+
+    DelegationTokenIdentifier delegationTokenId = new DelegationTokenIdentifier();
+    DelegationKey delegationKey = new DelegationKey();
+
     long startTime = FSNamesystem.now();
 
     DataInputStream in = new DataInputStream(new BufferedInputStream(edits));
@@ -776,6 +789,52 @@ public class FSEditLog {
           fsDir.unprotectedSetTimes(path, mtime, atime, true);
           break;
         }
+        case OP_GET_DELEGATION_TOKEN: {
+          if (logVersion > -19) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpGetDelegationToken++;
+          delegationTokenId.readFields(in);
+          long expiryTime = readLong(in);
+          fsNamesys.getDelegationTokenSecretManager()
+              .addPersistedDelegationToken(delegationTokenId, expiryTime);
+          break;
+        }
+        case OP_RENEW_DELEGATION_TOKEN: {
+          if (logVersion > -19) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpRenewDelegationToken++;
+          delegationTokenId.readFields(in);
+          long expiryTime = readLong(in);
+          fsNamesys.getDelegationTokenSecretManager()
+              .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+          break;
+        }
+        case OP_CANCEL_DELEGATION_TOKEN: {
+          if (logVersion > -19) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpCancelDelegationToken++;
+          delegationTokenId.readFields(in);
+          fsNamesys.getDelegationTokenSecretManager()
+              .updatePersistedTokenCancellation(delegationTokenId);
+          break;
+        }
+        case OP_UPDATE_MASTER_KEY: {
+          if (logVersion > -19) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpUpdateMasterKey++;
+          delegationKey.readFields(in);
+          fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
+              delegationKey);
+          break;
+        }
         default: {
           throw new IOException("Never seen opcode " + opcode);
         }
@@ -796,6 +855,10 @@ public class FSEditLog {
           + " numOpSetOwner = " + numOpSetOwner
           + " numOpSetGenStamp = " + numOpSetGenStamp 
           + " numOpTimes = " + numOpTimes
+          + " numOpGetDelegationToken = " + numOpGetDelegationToken
+          + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
+          + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
+          + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
           + " numOpOther = " + numOpOther);
     }
 
@@ -1090,7 +1153,31 @@ public class FSEditLog {
       FSEditLog.toLogLong(atime)};
     logEdit(OP_TIMES, new ArrayWritable(UTF8.class, info));
   }
-  
+
+  /**
+   * log delegation token to edit log
+   * @param id DelegationTokenIdentifier
+   * @param expiryTime of the token
+   * @return
+   */
+  void logGetDelegationToken(DelegationTokenIdentifier id,
+      long expiryTime) {
+    logEdit(OP_GET_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+  }
+
+  void logRenewDelegationToken(DelegationTokenIdentifier id,
+      long expiryTime) {
+    logEdit(OP_RENEW_DELEGATION_TOKEN, id, FSEditLog.toLogLong(expiryTime));
+  }
+
+  void logCancelDelegationToken(DelegationTokenIdentifier id) {
+    logEdit(OP_CANCEL_DELEGATION_TOKEN, id);
+  }
+
+  void logUpdateMasterKey(DelegationKey key) {
+    logEdit(OP_UPDATE_MASTER_KEY, key);
+  }
+
   static private UTF8 toLogReplication(short replication) {
     return new UTF8(Short.toString(replication));
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Mar  4 03:55:08 2011
@@ -959,6 +959,8 @@ public class FSImage extends Storage {
       // load Files Under Construction
       this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
       
+      this.loadSecretManagerState(imgVersion, in, fsNamesys);
+      
     } finally {
       in.close();
     }
@@ -1028,6 +1030,7 @@ public class FSImage extends Storage {
       // save the rest of the nodes
       saveImage(strbuf, 0, fsDir.rootDir, out);
       fsNamesys.saveFilesUnderConstruction(out);
+      fsNamesys.saveSecretManagerState(out);
       strbuf = null;
     } finally {
       out.close();
@@ -1220,6 +1223,16 @@ public class FSImage extends Storage {
     }
   }
 
+  private void loadSecretManagerState(int version,  DataInputStream in, 
+      FSNamesystem fs) throws IOException {
+    if (version > -19) {
+      //SecretManagerState is not available.
+      //This must not happen if security is turned on.
+      return; 
+    }
+    fs.loadSecretManagerState(in);
+  }
+  
   // Helper function that reads in an INodeUnderConstruction
   // from the input stream
   //

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Mar  4 03:55:08 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.security.E
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.util.*;
@@ -61,6 +62,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
 
 import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.FileNotFoundException;
@@ -310,6 +313,12 @@ public class FSNamesystem implements FSC
     }
   }
 
+  void activateSecretManager() throws IOException {
+    if (dtSecretManager != null) {
+      dtSecretManager.startThreads();
+    }
+  }
+
   /**
    * Initialize FSNamesystem.
    */
@@ -317,7 +326,6 @@ public class FSNamesystem implements FSC
     this.systemStart = now();
     setConfigurationParameters(conf);
     dtSecretManager = createDelegationTokenSecretManager(conf);
-    dtSecretManager.startThreads();
 
     this.nameNodeAddress = nn.getNameNodeAddress();
     this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
@@ -396,6 +404,7 @@ public class FSNamesystem implements FSC
   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
     setConfigurationParameters(conf);
     this.dir = new FSDirectory(fsImage, this, conf);
+    dtSecretManager = createDelegationTokenSecretManager(conf);
   }
 
   /**
@@ -4915,15 +4924,23 @@ public class FSNamesystem implements FSC
             "dfs.namenode.delegation.token.max-lifetime", 7*24*60*60*1000),
         conf.getLong(
             "dfs.namenode.delegation.token.renew-interval", 24*60*60*1000),
-        DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL);
+        DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL, this);
   }
 
   public DelegationTokenSecretManager getDelegationTokenSecretManager() {
     return dtSecretManager;
   }
 
+  /**
+   * @param renewer
+   * @return Token<DelegationTokenIdentifier>
+   * @throws IOException
+   */
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot issue delegation token", safeMode);
+    }
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String user = ugi.getUserName();
     Text owner = new Text(user);
@@ -4933,18 +4950,116 @@ public class FSNamesystem implements FSC
     }
     DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
         renewer, realUser);
-    return new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
+    Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
+        dtId, dtSecretManager);
+    long expiryTime = dtSecretManager.getTokenExpiryTime(dtId);
+    logGetDelegationToken(dtId, expiryTime);
+    return token;
   }
 
+  /**
+   * 
+   * @param token
+   * @return New expiryTime of the token
+   * @throws InvalidToken
+   * @throws IOException
+   */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot renew delegation token", safeMode);
+    }
     String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
-    return dtSecretManager.renewToken(token, renewer);
+    long expiryTime = dtSecretManager.renewToken(token, renewer);
+    DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    id.readFields(in);
+    logRenewDelegationToken(id, expiryTime);
+    return expiryTime;
   }
 
+  /**
+   * 
+   * @param token
+   * @throws IOException
+   */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot cancel delegation token", safeMode);
+    }
     String canceller = UserGroupInformation.getCurrentUser().getShortUserName();
-    dtSecretManager.cancelToken(token, canceller);
+    DelegationTokenIdentifier id = dtSecretManager
+        .cancelToken(token, canceller);
+    logCancelDelegationToken(id);
+  }
+  
+  /**
+   * @param out save state of the secret manager
+   */
+  void saveSecretManagerState(DataOutputStream out) throws IOException {
+    dtSecretManager.saveSecretManagerState(out);
+  }
+
+  /**
+   * @param in load the state of secret manager from input stream
+   */
+  void loadSecretManagerState(DataInputStream in) throws IOException {
+    dtSecretManager.loadSecretManagerState(in);
+  }
+
+  /**
+   * Log the getDelegationToken operation to edit logs
+   * 
+   * @param id identifer of the new delegation token
+   * @param expiryTime when delegation token expires
+   */
+  private void logGetDelegationToken(DelegationTokenIdentifier id,
+      long expiryTime) throws IOException {
+    synchronized (this) {
+      getEditLog().logGetDelegationToken(id, expiryTime);
+    }
+    getEditLog().logSync();
+  }
+
+  /**
+   * Log the renewDelegationToken operation to edit logs
+   * 
+   * @param id identifer of the delegation token being renewed
+   * @param expiryTime when delegation token expires
+   */
+  private void logRenewDelegationToken(DelegationTokenIdentifier id,
+      long expiryTime) throws IOException {
+    synchronized (this) {
+      getEditLog().logRenewDelegationToken(id, expiryTime);
+    }
+    getEditLog().logSync();
+  }
+
+  
+  /**
+   * Log the cancelDelegationToken operation to edit logs
+   * 
+   * @param id identifer of the delegation token being cancelled
+   */
+  private void logCancelDelegationToken(DelegationTokenIdentifier id)
+      throws IOException {
+    synchronized (this) {
+      getEditLog().logCancelDelegationToken(id);
+    }
+    getEditLog().logSync();
+  }
+
+  /**
+   * Log the updateMasterKey operation to edit logs
+   * 
+   * @param key new delegation key.
+   */
+  public void logUpdateMasterKey(DelegationKey key) throws IOException {
+    synchronized (this) {
+      getEditLog().logUpdateMasterKey(key);
+    }
+    getEditLog().logSync();
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Mar  4 03:55:08 2011
@@ -192,6 +192,10 @@ public class NameNode implements ClientP
     myMetrics = new NameNodeMetrics(conf, this);
     this.namesystem = new FSNamesystem(this, conf);
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      namesystem.activateSecretManager();
+    }
+
     // create rpc server 
     this.server = RPC.getServer(this, socAddr.getHostName(),
         socAddr.getPort(), handlerCount, false, conf, namesystem

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar  4 03:55:08 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * This class provides user facing APIs for transferring secrets from
@@ -98,6 +99,14 @@ public class TokenCache {
    */
   public static void obtainTokensForNamenodes(Path [] ps, Configuration conf) 
   throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    obtainTokensForNamenodesInternal(ps, conf);
+  }
+
+  static void obtainTokensForNamenodesInternal(Path [] ps, Configuration conf)
+  throws IOException {
     // get jobtracker principal id (for the renewer)
     Text jtCreds = new Text(conf.get(JobContext.JOB_JOBTRACKER_ID, ""));
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java Fri Mar  4 03:55:08 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -75,13 +76,14 @@ public class TestClientProtocolWithDeleg
   @Test
   public void testDelegationTokenRpc() throws Exception {
     ClientProtocol mockNN = mock(ClientProtocol.class);
+    FSNamesystem mockNameSys = mock(FSNamesystem.class);
     when(mockNN.getProtocolVersion(anyString(), anyLong())).thenReturn(
         ClientProtocol.versionID);
     DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
-        3600000);
+        3600000, mockNameSys);
     sm.startThreads();
     final Server server = RPC.getServer(mockNN, ADDRESS,
         0, 5, true, conf, sm);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationToken.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationToken.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationToken.java Fri Mar  4 03:55:08 2011
@@ -22,8 +22,6 @@ package org.apache.hadoop.hdfs.security;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 
 import junit.framework.Assert;
 
@@ -34,7 +32,6 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -56,6 +53,8 @@ public class TestDelegationToken {
     FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
     cluster = new MiniDFSCluster(0, config, 1, true, true, true,  null, null, null, null);
     cluster.waitActive();
+    cluster.getNameNode().getNamesystem().getDelegationTokenSecretManager()
+				.startThreads();
   }
 
   @After

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java Fri Mar  4 03:55:08 2011
@@ -98,6 +98,7 @@ public class TestDelegationTokenForProxy
     cluster = new MiniDFSCluster(0, config, 1, true, true, true, null, null,
         null, null);
     cluster.waitActive();
+    cluster.getNameNode().getNamesystem().getDelegationTokenSecretManager().startThreads();
   }
 
   @After

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java?rev=1077240&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java Fri Mar  4 03:55:08 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import junit.framework.Assert;
+import java.io.*;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Test;
+
+/**
+ * This class tests the creation and validation of a checkpoint.
+ */
+public class TestCheckPointForSecurityTokens {
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 4096;
+  static final int fileSize = 8192;
+  static final int numDatanodes = 3;
+  short replication = 3;
+
+  NameNode startNameNode( Configuration conf,
+                          String imageDirs,
+                          String editsDirs,
+                          StartupOption start) throws IOException {
+    conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://localhost:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");  
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, imageDirs);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, editsDirs);
+    String[] args = new String[]{start.getName()};
+    NameNode nn = NameNode.createNameNode(args, conf);
+    Assert.assertTrue(nn.isInSafeMode());
+    return nn;
+  }
+
+  /**
+   * Tests save namepsace.
+   */
+  @Test
+  public void testSaveNamespace() throws IOException {
+    MiniDFSCluster cluster = null;
+    DistributedFileSystem fs = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
+      cluster.waitActive();
+      fs = (DistributedFileSystem)(cluster.getFileSystem());
+      FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
+      namesystem.getDelegationTokenSecretManager().startThreads();
+      String renewer = UserGroupInformation.getLoginUser().getUserName();
+      Token<DelegationTokenIdentifier> token = namesystem
+          .getDelegationToken(new Text(renewer)); 
+      
+      // Saving image without safe mode should fail
+      DFSAdmin admin = new DFSAdmin(conf);
+      String[] args = new String[]{"-saveNamespace"};
+
+      // verify that the edits file is NOT empty
+      Collection<File> editsDirs = cluster.getNameEditsDirs();
+      for(File ed : editsDirs) {
+        Assert.assertTrue(new File(ed, "current/edits").length() > Integer.SIZE/Byte.SIZE);
+      }
+
+      // Saving image in safe mode should succeed
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      try {
+        admin.run(args);
+      } catch(Exception e) {
+        throw new IOException(e.getMessage());
+      }
+      // verify that the edits file is empty
+      for(File ed : editsDirs) {
+        Assert.assertTrue(new File(ed, "current/edits").length() == Integer.SIZE/Byte.SIZE);
+      }
+
+      // restart cluster and verify file exists
+      cluster.shutdown();
+      cluster = null;
+
+      cluster = new MiniDFSCluster(conf, numDatanodes, false, null);
+      cluster.waitActive();
+      //Should be able to renew & cancel the delegation token after cluster restart
+      try {
+        cluster.getNameNode().getNamesystem().renewDelegationToken(token);
+        cluster.getNameNode().getNamesystem().cancelDelegationToken(token);
+      } catch (IOException e) {
+        Assert.fail("Could not renew or cancel the token");
+      }
+    } finally {
+      if(fs != null) fs.close();
+      if(cluster!= null) cluster.shutdown();
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java?rev=1077240&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java Fri Mar  4 03:55:08 2011
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.net.URI;
+import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
+
+/**
+ * This class tests the creation and validation of a checkpoint.
+ */
+public class TestSecurityTokenEditLog extends TestCase {
+  static final int NUM_DATA_NODES = 1;
+
+  // This test creates NUM_THREADS threads and each thread does
+  // 2 * NUM_TRANSACTIONS Transactions concurrently.
+  static final int NUM_TRANSACTIONS = 100;
+  static final int NUM_THREADS = 100;
+  static final int opsPerTrans = 3;
+
+  //
+  // an object that does a bunch of transactions
+  //
+  static class Transactions implements Runnable {
+    FSNamesystem namesystem;
+    int numTransactions;
+    short replication = 3;
+    long blockSize = 64;
+
+    Transactions(FSNamesystem ns, int num) {
+      namesystem = ns;
+      numTransactions = num;
+    }
+
+    // add a bunch of transactions.
+    public void run() {
+      FSEditLog editLog = namesystem.getEditLog();
+
+      for (int i = 0; i < numTransactions; i++) {
+        try {
+          String renewer = UserGroupInformation.getLoginUser().getUserName();
+          Token<DelegationTokenIdentifier> token = namesystem
+              .getDelegationToken(new Text(renewer));
+          namesystem.renewDelegationToken(token);
+          namesystem.cancelDelegationToken(token);
+          editLog.logSync();
+        } catch (IOException e) {
+          System.out.println("Transaction " + i + " encountered exception " +
+                             e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Tests transaction logging in dfs.
+   */
+  public void testEditLog() throws IOException {
+
+    // start a cluster 
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null);
+      cluster.waitActive();
+      fileSys = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
+      namesystem.getDelegationTokenSecretManager().startThreads();
+  
+      for (Iterator<File> it = cluster.getNameDirs().iterator(); it.hasNext(); ) {
+        File dir = new File(it.next().getPath());
+        System.out.println(dir);
+      }
+      
+      FSImage fsimage = namesystem.getFSImage();
+      FSEditLog editLog = fsimage.getEditLog();
+  
+      // set small size of flush buffer
+      editLog.setBufferCapacity(2048);
+      editLog.close();
+      editLog.open();
+    
+      // Create threads and make them run transactions concurrently.
+      Thread threadId[] = new Thread[NUM_THREADS];
+      for (int i = 0; i < NUM_THREADS; i++) {
+        Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
+        threadId[i] = new Thread(trans, "TransactionThread-" + i);
+        threadId[i].start();
+      }
+  
+      // wait for all transactions to get over
+      for (int i = 0; i < NUM_THREADS; i++) {
+        try {
+          threadId[i].join();
+        } catch (InterruptedException e) {
+          i--;      // retry 
+        }
+      } 
+      
+      editLog.close();
+      editLog.open();
+  
+      // Verify that we can read in all the transactions that we have written.
+      // If there were any corruptions, it is likely that the reading in
+      // of these transactions will throw an exception.
+      //
+      namesystem.getDelegationTokenSecretManager().stopThreads();
+      for (Iterator<StorageDirectory> it = 
+              fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+        File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+        System.out.println("Verifying file: " + editFile);
+        int numEdits = FSEditLog.loadFSEdits(
+                                  new EditLogFileInputStream(editFile));
+        assertTrue("Verification for " + editFile + " failed. " +
+                   "Expected " + (NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS + 2) + " transactions. "+
+                   "Found " + numEdits + " transactions.",
+                   numEdits == NUM_THREADS * opsPerTrans * NUM_TRANSACTIONS +2);
+  
+      }
+    } finally {
+      if(fileSys != null) fileSys.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Mar  4 03:55:08 2011
@@ -122,8 +122,9 @@ public class TestTokenCache {
   private static int numSlaves = 1;
   private static JobConf jConf;
   private static ObjectMapper mapper = new ObjectMapper();
+  private static Path p1;
+  private static Path p2;
   
-
   @BeforeClass
   public static void setUp() throws Exception {
     Configuration conf = new Configuration();
@@ -135,6 +136,17 @@ public class TestTokenCache {
     
     createTokenFileJson();
     verifySecretKeysInJSONFile();
+    dfsCluster.getNameNode().getNamesystem()
+				.getDelegationTokenSecretManager().startThreads();
+    FileSystem fs = dfsCluster.getFileSystem();
+    
+    p1 = new Path("file1");
+    p2 = new Path("file2");
+    
+    p1 = fs.makeQualified(p1);
+    // do not qualify p2
+    TokenCache.setTokenStorage(new TokenStorage());
+    TokenCache.obtainTokensForNamenodesInternal(new Path [] {p1, p2}, jConf);
   }
 
   @AfterClass
@@ -249,15 +261,6 @@ public class TestTokenCache {
   public void testGetTokensForNamenodes() throws IOException {
     FileSystem fs = dfsCluster.getFileSystem();
 
-    Path p1 = new Path("file1");
-    Path p2 = new Path("file2");
-
-    p1 = fs.makeQualified(p1);
-    // do not qualify p2
-
-    TokenCache.setTokenStorage(new TokenStorage());
-    TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
-
     // this token is keyed by hostname:port key.
     String fs_addr = TokenCache.buildDTServiceName(p1.toUri()); 
     Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Mar  4 03:55:08 2011
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.security.TokenStorage;
@@ -67,6 +69,22 @@ public class TestDelegationTokenRenewal 
     System.out.println("filesystem uri = " + FileSystem.getDefaultUri(conf).toString());
   }
   
+  private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager {
+
+    public MyDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+        long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+        long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+      super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
+          namesystem);
+    }
+    
+    @Override //DelegationTokenSecretManager
+    public void logUpdateMasterKey(DelegationKey key) throws IOException {
+      return;
+    }
+  }
+  
   /**
    * add some extra functionality for testing
    * 1. toString();
@@ -77,7 +95,7 @@ public class TestDelegationTokenRenewal 
     public static final String CANCELED = "CANCELED";
 
     public MyToken(DelegationTokenIdentifier dtId1,
-        DelegationTokenSecretManager sm) {
+        MyDelegationTokenSecretManager sm) {
       super(dtId1, sm);
       status = "GOOD";
     }
@@ -165,11 +183,11 @@ public class TestDelegationTokenRenewal 
     throws IOException {
     Text user1= new Text("user1");
     
-    DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
+    MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
-        3600000);
+        3600000, null);
     sm.startThreads();
     
     DelegationTokenIdentifier dtId1 = 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/token/delegation/TestDelegationToken.java?rev=1077240&r1=1077239&r2=1077240&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/token/delegation/TestDelegationToken.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/token/delegation/TestDelegationToken.java Fri Mar  4 03:55:08 2011
@@ -25,7 +25,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -36,8 +39,11 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
 
@@ -91,6 +97,18 @@ public class TestDelegationToken {
     protected byte[] createPassword(TestDelegationTokenIdentifier t) {
       return super.createPassword(t);
     }
+    
+    public byte[] createPassword(TestDelegationTokenIdentifier t, DelegationKey key) {
+      return SecretManager.createPassword(t.getBytes(), key.getKey());
+    }
+    
+    public Map<TestDelegationTokenIdentifier, DelegationTokenInformation> getAllTokens() {
+      return currentTokens;
+    }
+    
+    public DelegationKey getKey(TestDelegationTokenIdentifier id) {
+      return allKeys.get(id.getMasterKeyId());
+    }
   }
   
   public static class TokenSelector extends 
@@ -299,4 +317,52 @@ public class TestDelegationToken {
       dtSecretManager.stopThreads();
     }
   }
+  
+  @Test
+  public void testParallelDelegationTokenCreation() throws Exception {
+    final TestDelegationTokenSecretManager dtSecretManager = 
+        new TestDelegationTokenSecretManager(2000, 24 * 60 * 60 * 1000, 
+            7 * 24 * 60 * 60 * 1000, 2000);
+    try {
+      dtSecretManager.startThreads();
+      int numThreads = 100;
+      final int numTokensPerThread = 100;
+      class tokenIssuerThread implements Runnable {
+
+        public void run() {
+          for(int i =0;i <numTokensPerThread; i++) {
+            generateDelegationToken(dtSecretManager, "auser", "arenewer");
+            try {
+              Thread.sleep(250); 
+            } catch (Exception e) {
+            }
+          }
+        }
+      }
+      Thread[] issuers = new Thread[numThreads];
+      for (int i =0; i <numThreads; i++) {
+        issuers[i] = new Daemon(new tokenIssuerThread());
+        issuers[i].start();
+      }
+      for (int i =0; i <numThreads; i++) {
+        issuers[i].join();
+      }
+      Map<TestDelegationTokenIdentifier, DelegationTokenInformation> tokenCache = dtSecretManager
+          .getAllTokens();
+      Assert.assertEquals(numTokensPerThread*numThreads, tokenCache.size());
+      Iterator<TestDelegationTokenIdentifier> iter = tokenCache.keySet().iterator();
+      while (iter.hasNext()) {
+        TestDelegationTokenIdentifier id = iter.next();
+        DelegationTokenInformation info = tokenCache.get(id);
+        Assert.assertTrue(info != null);
+        DelegationKey key = dtSecretManager.getKey(id);
+        Assert.assertTrue(key != null);
+        byte[] storedPassword = dtSecretManager.retrievePassword(id);
+        byte[] password = dtSecretManager.createPassword(id, key);
+        Assert.assertTrue(Arrays.equals(password, storedPassword));
+      }
+    } finally {
+      dtSecretManager.stopThreads();
+    }
+  }
 }