You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/05/30 06:14:27 UTC

svn commit: r1487720 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-reso...

Author: vinodkv
Date: Thu May 30 04:14:26 2013
New Revision: 1487720

URL: http://svn.apache.org/r1487720
Log:
YARN-638. Modified ResourceManager to restore RMDelegationTokens after restarting. Contributed by Jian He.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu May 30 04:14:26 2013
@@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response.
     (Omkar Vinit Joshi via vinodkv)
 
+    YARN-638. Modified ResourceManager to restore RMDelegationTokens after
+    restarting. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu May 30 04:14:26 2013
@@ -239,7 +239,7 @@ public class ResourceManager extends Com
     // Register event handler for RMAppManagerEvents
     this.rmDispatcher.register(RMAppManagerEventType.class,
         this.rmAppManager);
-    this.rmDTSecretManager = createRMDelegationTokenSecretManager();
+    this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
     clientRM = createClientRMService();
     addService(clientRM);
     
@@ -666,7 +666,7 @@ public class ResourceManager extends Com
   }
 
   protected RMDelegationTokenSecretManager
-               createRMDelegationTokenSecretManager() {
+               createRMDelegationTokenSecretManager(RMContext rmContext) {
     long secretKeyInterval = 
         conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, 
             YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@@ -678,7 +678,7 @@ public class ResourceManager extends Com
             YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
 
     return new RMDelegationTokenSecretManager(secretKeyInterval, 
-        tokenMaxLifetime, tokenRenewInterval, 3600000);
+        tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
   }
 
   protected ClientRMService createClientRMService() {
@@ -745,6 +745,9 @@ public class ResourceManager extends Com
 
   @Override
   public void recover(RMState state) throws Exception {
+    // recover RMdelegationTokenSecretManager
+    rmDTSecretManager.recover(state);
+
     // recover applications
     rmAppManager.recover(state);
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Thu May 30 04:14:26 2013
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -33,11 +37,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -57,11 +63,19 @@ public class FileSystemRMStateStore exte
   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
 
   private static final String ROOT_DIR_NAME = "FSRMStateRoot";
-
+  private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
+  private static final String RM_APP_ROOT = "RMAppRoot";
+  private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
+  private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
+  private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
+      "RMDTSequenceNumber_";
 
   private FileSystem fs;
 
-  private Path fsRootDirPath;
+  private Path rootDirPath;
+  private Path rmDTSecretManagerRoot;
+  private Path rmAppRoot;
+  private Path dtSequenceNumberPath = null;
 
   @VisibleForTesting
   Path fsWorkingPath;
@@ -70,11 +84,14 @@ public class FileSystemRMStateStore exte
       throws Exception{
 
     fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
-    fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+    rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+    rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
+    rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
 
     // create filesystem
     fs = fsWorkingPath.getFileSystem(conf);
-    fs.mkdirs(fsRootDirPath);
+    fs.mkdirs(rmDTSecretManagerRoot);
+    fs.mkdirs(rmAppRoot);
   }
 
   @Override
@@ -84,15 +101,23 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized RMState loadState() throws Exception {
+    RMState rmState = new RMState();
+    // recover DelegationTokenSecretManager
+    loadRMDTSecretManagerState(rmState);
+    // recover RM applications
+    loadRMAppState(rmState);
+    return rmState;
+  }
+
+  private void loadRMAppState(RMState rmState) throws Exception {
     try {
-      RMState state = new RMState();
-      FileStatus[] childNodes = fs.listStatus(fsRootDirPath);
+      FileStatus[] childNodes = fs.listStatus(rmAppRoot);
       List<ApplicationAttemptState> attempts =
                                       new ArrayList<ApplicationAttemptState>();
       for(FileStatus childNodeStatus : childNodes) {
         assert childNodeStatus.isFile();
         String childNodeName = childNodeStatus.getPath().getName();
-        Path childNodePath = getNodePath(childNodeName);
+        Path childNodePath = getNodePath(rmAppRoot, childNodeName);
         byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
         if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
           // application
@@ -107,7 +132,7 @@ public class FileSystemRMStateStore exte
                                appStateData.getUser());
           // assert child node name is same as actual applicationId
           assert appId.equals(appState.context.getApplicationId());
-          state.appState.put(appId, appState);
+          rmState.appState.put(appId, appState);
         } else if(childNodeName.startsWith(
                                 ApplicationAttemptId.appAttemptIdStrPrefix)) {
           // attempt
@@ -139,7 +164,7 @@ public class FileSystemRMStateStore exte
       // go through all attempts and add them to their apps
       for(ApplicationAttemptState attemptState : attempts) {
         ApplicationId appId = attemptState.getAttemptId().getApplicationId();
-        ApplicationState appState = state.appState.get(appId);
+        ApplicationState appState = rmState.appState.get(appId);
         if(appState != null) {
           appState.attempts.put(attemptState.getAttemptId(), attemptState);
         } else {
@@ -148,22 +173,49 @@ public class FileSystemRMStateStore exte
           // application attempt nodes
           LOG.info("Application node not found for attempt: "
                     + attemptState.getAttemptId());
-          deleteFile(getNodePath(attemptState.getAttemptId().toString()));
+          deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
         }
       }
-
-      return state;
     } catch (Exception e) {
       LOG.error("Failed to load state.", e);
       throw e;
     }
   }
 
+  private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
+    FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
+
+    for(FileStatus childNodeStatus : childNodes) {
+      assert childNodeStatus.isFile();
+      String childNodeName = childNodeStatus.getPath().getName();
+      Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
+      byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+      ByteArrayInputStream is = new ByteArrayInputStream(childData);
+      DataInputStream fsIn = new DataInputStream(is);
+      if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
+        DelegationKey key = new DelegationKey();
+        key.readFields(fsIn);
+        rmState.rmSecretManagerState.masterKeyState.add(key);
+      } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+        RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
+        identifier.readFields(fsIn);
+        long renewDate = fsIn.readLong();
+        rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+          renewDate);
+      } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+        rmState.rmSecretManagerState.dtSequenceNumber =
+            Integer.parseInt(childNodeName.split("_")[1]);
+      }else {
+        LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
+      }
+      fsIn.close();
+    }
+  }
+
   @Override
   public synchronized void storeApplicationState(String appId,
-                                     ApplicationStateDataPBImpl appStateDataPB)
-                                     throws Exception {
-    Path nodeCreatePath = getNodePath(appId);
+      ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+    Path nodeCreatePath = getNodePath(rmAppRoot, appId);
 
     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -179,9 +231,8 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void storeApplicationAttemptState(String attemptId,
-                          ApplicationAttemptStateDataPBImpl attemptStateDataPB)
-                          throws Exception {
-    Path nodeCreatePath = getNodePath(attemptId);
+      ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
+    Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
     LOG.info("Storing info for attempt: " + attemptId
              + " at: " + nodeCreatePath);
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -197,9 +248,9 @@ public class FileSystemRMStateStore exte
 
   @Override
   public synchronized void removeApplicationState(ApplicationState appState)
-                                                            throws Exception {
+      throws Exception {
     String appId = appState.getAppId().toString();
-    Path nodeRemovePath = getNodePath(appId);
+    Path nodeRemovePath = getNodePath(rmAppRoot, appId);
     LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
     deleteFile(nodeRemovePath);
     for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
@@ -208,13 +259,76 @@ public class FileSystemRMStateStore exte
   }
 
   public synchronized void removeApplicationAttemptState(String attemptId)
-                                                            throws Exception {
-    Path nodeRemovePath = getNodePath(attemptId);
+      throws Exception {
+    Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
     LOG.info("Removing info for attempt: " + attemptId
              + " at: " + nodeRemovePath);
     deleteFile(nodeRemovePath);
   }
 
+  @Override
+  public synchronized void storeRMDelegationTokenAndSequenceNumberState(
+      RMDelegationTokenIdentifier identifier, Long renewDate,
+      int latestSequenceNumber) throws Exception {
+    Path nodeCreatePath =
+        getNodePath(rmDTSecretManagerRoot,
+          DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream fsOut = new DataOutputStream(os);
+    LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
+    identifier.write(fsOut);
+    fsOut.writeLong(renewDate);
+    writeFile(nodeCreatePath, os.toByteArray());
+    fsOut.close();
+
+    // store sequence number
+    Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
+          DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
+    LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+        + latestSequenceNumber);
+    if (dtSequenceNumberPath == null) {
+      if (!createFile(latestSequenceNumberPath)) {
+        throw new Exception("Failed to create " + latestSequenceNumberPath);
+      }
+    } else {
+      if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
+        throw new Exception("Failed to rename " + dtSequenceNumberPath);
+      }
+    }
+    dtSequenceNumberPath = latestSequenceNumberPath;
+  }
+
+  @Override
+  public synchronized void removeRMDelegationTokenState(
+      RMDelegationTokenIdentifier identifier) throws Exception {
+    Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
+      DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+    LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
+    deleteFile(nodeCreatePath);
+  }
+
+  @Override
+  public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
+      throws Exception {
+    Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
+          DELEGATION_KEY_PREFIX + masterKey.getKeyId());
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream fsOut = new DataOutputStream(os);
+    LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
+    masterKey.write(fsOut);
+    writeFile(nodeCreatePath, os.toByteArray());
+    fsOut.close();
+  }
+
+  @Override
+  public synchronized void
+      removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {
+    Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
+          DELEGATION_KEY_PREFIX + masterKey.getKeyId());
+    LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
+    deleteFile(nodeCreatePath);
+  }
+
   // FileSystem related code
 
   private void deleteFile(Path deletePath) throws Exception {
@@ -228,18 +342,25 @@ public class FileSystemRMStateStore exte
     // state data will not be that "long"
     byte[] data = new byte[(int)len];
     fsIn.readFully(data);
+    fsIn.close();
     return data;
   }
 
   private void writeFile(Path outputPath, byte[] data) throws Exception {
     FSDataOutputStream fsOut = fs.create(outputPath, false);
     fsOut.write(data);
-    fsOut.flush();
     fsOut.close();
   }
 
-  @VisibleForTesting
-  Path getNodePath(String nodeName) {
-    return new Path(fsRootDirPath, nodeName);
+  private boolean renameFile(Path src, Path dst) throws Exception {
+    return fs.rename(src, dst);
+  }
+
+  private boolean createFile(Path newFile) throws Exception {
+    return fs.createNewFile(newFile);
+  }
+
+  private Path getNodePath(Path root, String nodeName) {
+    return new Path(root, nodeName);
   }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Thu May 30 04:14:26 2013
@@ -19,14 +19,18 @@
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -49,6 +53,12 @@ public class MemoryRMStateStore extends 
     // return a copy of the state to allow for modification of the real state
     RMState returnState = new RMState();
     returnState.appState.putAll(state.appState);
+    returnState.rmSecretManagerState.getMasterKeyState()
+      .addAll(state.rmSecretManagerState.getMasterKeyState());
+    returnState.rmSecretManagerState.getTokenState().putAll(
+      state.rmSecretManagerState.getTokenState());
+    returnState.rmSecretManagerState.dtSequenceNumber =
+        state.rmSecretManagerState.dtSequenceNumber;
     return returnState;
   }
   
@@ -113,4 +123,53 @@ public class MemoryRMStateStore extends 
     ApplicationState removed = state.appState.remove(appId);
     assert removed != null;
   }
+
+  @Override
+  public synchronized void storeRMDelegationTokenAndSequenceNumberState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+      int latestSequenceNumber) throws Exception {
+    Map<RMDelegationTokenIdentifier, Long> rmDTState =
+        state.rmSecretManagerState.getTokenState();
+    if (rmDTState.containsKey(rmDTIdentifier)) {
+      IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
+              + "is already stored.");
+      LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
+      throw e;
+    }
+    rmDTState.put(rmDTIdentifier, renewDate);
+    state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber;
+  }
+
+  @Override
+  public synchronized void removeRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{
+    Map<RMDelegationTokenIdentifier, Long> rmDTState =
+        state.rmSecretManagerState.getTokenState();
+    rmDTState.remove(rmDTIdentifier);
+  }
+
+  @Override
+  public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey)
+      throws Exception {
+    Set<DelegationKey> rmDTMasterKeyState =
+        state.rmSecretManagerState.getMasterKeyState();
+
+    if (rmDTMasterKeyState.contains(delegationKey)) {
+      IOException e = new IOException("RMDTMasterKey with keyID: "
+              + delegationKey.getKeyId() + " is already stored");
+      LOG.info("Error storing info for RMDTMasterKey with keyID: "
+          + delegationKey.getKeyId(), e);
+      throw e;
+    }
+    state.getRMDTSecretManagerState().getMasterKeyState().add(delegationKey);
+    LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size());
+  }
+
+  @Override
+  public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
+      throws Exception {
+    Set<DelegationKey> rmDTMasterKeyState =
+        state.rmSecretManagerState.getMasterKeyState();
+    rmDTMasterKeyState.remove(delegationKey);
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Thu May 30 04:14:26 2013
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 
 @Unstable
 public class NullRMStateStore extends RMStateStore {
@@ -59,4 +62,26 @@ public class NullRMStateStore extends RM
     // Do nothing
   }
 
+  @Override
+  public void storeRMDelegationTokenAndSequenceNumberState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+      int latestSequenceNumber) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier)
+      throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
+    // Do nothing
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Thu May 30 04:14:26 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.re
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.security.A
 import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -57,7 +61,7 @@ import org.apache.hadoop.yarn.server.res
  */
 public abstract class RMStateStore {
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
-  
+
   /**
    * State of an application attempt
    */
@@ -121,17 +125,46 @@ public abstract class RMStateStore {
       return user;
     }
   }
-  
+
+  public static class RMDTSecretManagerState {
+    // DTIdentifier -> renewDate
+    Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
+        new HashMap<RMDelegationTokenIdentifier, Long>();
+
+    Set<DelegationKey> masterKeyState =
+        new HashSet<DelegationKey>();
+
+    int dtSequenceNumber = 0;
+
+    public Map<RMDelegationTokenIdentifier, Long> getTokenState() {
+      return delegationTokenState;
+    }
+
+    public Set<DelegationKey> getMasterKeyState() {
+      return masterKeyState;
+    }
+
+    public int getDTSequenceNumber() {
+      return dtSequenceNumber;
+    }
+  }
+
   /**
    * State of the ResourceManager
    */
   public static class RMState {
-    Map<ApplicationId, ApplicationState> appState = 
-                                new HashMap<ApplicationId, ApplicationState>();
-    
+    Map<ApplicationId, ApplicationState> appState =
+        new HashMap<ApplicationId, ApplicationState>();
+
+    RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
+
     public Map<ApplicationId, ApplicationState> getApplicationState() {
       return appState;
     }
+
+    public RMDTSecretManagerState getRMDTSecretManagerState() {
+      return rmSecretManagerState;
+    }
   }
     
   private Dispatcher rmDispatcher;
@@ -235,8 +268,76 @@ public abstract class RMStateStore {
   protected abstract void storeApplicationAttemptState(String attemptId,
                             ApplicationAttemptStateDataPBImpl attemptStateData) 
                             throws Exception;
-  
-  
+
+
+  /**
+   * RMDTSecretManager call this to store the state of a delegation token
+   * and sequence number
+   */
+  public synchronized void storeRMDelegationTokenAndSequenceNumber(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+      int latestSequenceNumber) throws Exception {
+    storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
+      latestSequenceNumber);
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of
+   * RMDelegationToken and sequence number
+   */
+  protected abstract void storeRMDelegationTokenAndSequenceNumberState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+      int latestSequenceNumber) throws Exception;
+
+  /**
+   * RMDTSecretManager call this to remove the state of a delegation token
+   */
+  public synchronized void removeRMDelegationToken(
+      RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber)
+      throws Exception {
+    removeRMDelegationTokenState(rmDTIdentifier);
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to remove the state of RMDelegationToken
+   */
+  protected abstract void removeRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
+
+  /**
+   * RMDTSecretManager call this to store the state of a master key
+   */
+  public synchronized void storeRMDTMasterKey(DelegationKey delegationKey)
+      throws Exception {
+    storeRMDTMasterKeyState(delegationKey);
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of
+   * DelegationToken Master Key
+   */
+  protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
+      throws Exception;
+
+  /**
+   * RMDTSecretManager call this to remove the state of a master key
+   */
+  public synchronized void removeRMDTMasterKey(DelegationKey delegationKey)
+      throws Exception {
+    removeRMDTMasterKeyState(delegationKey);
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to remove the state of
+   * DelegationToken Master Key
+   */
+  protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
+      throws Exception;
+
   /**
    * Non-blocking API
    * ResourceManager services call this to remove an application from the state

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Thu May 30 04:14:26 2013
@@ -18,10 +18,26 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * A ResourceManager specific delegation token secret manager.
@@ -30,8 +46,13 @@ import org.apache.hadoop.yarn.security.c
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class RMDelegationTokenSecretManager
-    extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> {
+public class RMDelegationTokenSecretManager extends
+    AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> implements
+    Recoverable {
+  private static final Log LOG = LogFactory
+      .getLog(RMDelegationTokenSecretManager.class);
+
+  protected final RMContext rmContext;
 
   /**
    * Create a secret manager
@@ -46,13 +67,132 @@ public class RMDelegationTokenSecretMana
   public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
                                       long delegationTokenMaxLifetime,
                                       long delegationTokenRenewInterval,
-                                      long delegationTokenRemoverScanInterval) {
+                                      long delegationTokenRemoverScanInterval,
+                                      RMContext rmContext) {
     super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
           delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+    this.rmContext = rmContext;
   }
 
   @Override
   public RMDelegationTokenIdentifier createIdentifier() {
     return new RMDelegationTokenIdentifier();
   }
+
+  @Override
+  protected void storeNewMasterKey(DelegationKey newKey) {
+    try {
+      LOG.info("storing master key with keyID " + newKey.getKeyId());
+      rmContext.getStateStore().storeRMDTMasterKey(newKey);
+    } catch (Exception e) {
+      LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId());
+      ExitUtil.terminate(1, e);
+    }
+  }
+
+  @Override
+  protected void removeStoredMasterKey(DelegationKey key) {
+    try {
+      LOG.info("removing master key with keyID " + key.getKeyId());
+      rmContext.getStateStore().removeRMDTMasterKey(key);
+    } catch (Exception e) {
+      LOG.error("Error in removing master key with KeyID: " + key.getKeyId());
+      ExitUtil.terminate(1, e);
+    }
+  }
+
+  @Override
+  protected void storeNewToken(RMDelegationTokenIdentifier identifier,
+      long renewDate) {
+    try {
+      LOG.info("storing RMDelegation token with sequence number: "
+          + identifier.getSequenceNumber());
+      rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(
+        identifier, renewDate, identifier.getSequenceNumber());
+    } catch (Exception e) {
+      LOG.error("Error in storing RMDelegationToken with sequence number: "
+          + identifier.getSequenceNumber());
+      ExitUtil.terminate(1, e);
+    }
+  }
+
+  @Override
+  protected void updateStoredToken(RMDelegationTokenIdentifier id,
+      long renewDate) {
+    try {
+      LOG.info("updating RMDelegation token with sequence number: "
+          + id.getSequenceNumber());
+      rmContext.getStateStore().removeRMDelegationToken(id,
+        delegationTokenSequenceNumber);
+      rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
+        renewDate, id.getSequenceNumber());
+    } catch (Exception e) {
+      LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
+            + id.getSequenceNumber());
+      ExitUtil.terminate(1, e);
+    }
+  }
+
+  @Override
+  protected void removeStoredToken(RMDelegationTokenIdentifier ident)
+      throws IOException {
+    try {
+      LOG.info("removing RMDelegation token with sequence number: "
+          + ident.getSequenceNumber());
+      rmContext.getStateStore().removeRMDelegationToken(ident,
+        delegationTokenSequenceNumber);
+    } catch (Exception e) {
+      LOG.error("Error in removing RMDelegationToken with sequence number: "
+          + ident.getSequenceNumber());
+      ExitUtil.terminate(1, e);
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public synchronized Set<DelegationKey> getAllMasterKeys() {
+    HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
+    keySet.addAll(allKeys.values());
+    return keySet;
+  }
+
+  @Private
+  @VisibleForTesting
+  public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
+    Map<RMDelegationTokenIdentifier, Long> allTokens =
+        new HashMap<RMDelegationTokenIdentifier, Long>();
+
+    for (Map.Entry<RMDelegationTokenIdentifier,
+        DelegationTokenInformation> entry : currentTokens.entrySet()) {
+      allTokens.put(entry.getKey(), entry.getValue().getRenewDate());
+    }
+    return allTokens;
+  }
+
+  @Private
+  @VisibleForTesting
+  public int getLatestDTSequenceNumber() {
+    return delegationTokenSequenceNumber;
+  }
+
+  @Override
+  public void recover(RMState rmState) throws Exception {
+
+    LOG.info("recovering RMDelegationTokenSecretManager.");
+    // recover RMDTMasterKeys
+    for (DelegationKey dtKey : rmState.getRMDTSecretManagerState()
+      .getMasterKeyState()) {
+      addKey(dtKey);
+    }
+
+    // recover RMDelegationTokens
+    Map<RMDelegationTokenIdentifier, Long> rmDelegationTokens =
+        rmState.getRMDTSecretManagerState().getTokenState();
+    this.delegationTokenSequenceNumber =
+        rmState.getRMDTSecretManagerState().getDTSequenceNumber();
+    for (Map.Entry<RMDelegationTokenIdentifier, Long> entry : rmDelegationTokens
+      .entrySet()) {
+      addPersistedDelegationToken(entry.getKey(), entry.getValue());
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Thu May 30 04:14:26 2013
@@ -291,7 +291,7 @@ public class MockRM extends ResourceMana
   @Override
   protected ClientRMService createClientRMService() {
     return new ClientRMService(getRMContext(), getResourceScheduler(),
-        rmAppManager, applicationACLsManager, null) {
+        rmAppManager, applicationACLsManager, rmDTSecretManager) {
       @Override
       public void start() {
         // override to not start rpc handler

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Thu May 30 04:14:26 2013
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -90,7 +91,9 @@ public class TestClientRMService {
   
   @BeforeClass
   public static void setupSecretManager() throws IOException {
-    dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000);
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
+    dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
     dtsm.startThreads();  
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Thu May 30 04:14:26 2013
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -454,10 +456,15 @@ public class TestClientRMTokens {
     return mockSched;
   }
 
-  private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager(
-      long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) {
-    RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager(
-        secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000);
+  private static RMDelegationTokenSecretManager
+      createRMDelegationTokenSecretManager(long secretKeyInterval,
+          long tokenMaxLifetime, long tokenRenewInterval) {
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
+
+    RMDelegationTokenSecretManager rmDtSecretManager =
+        new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
+          tokenRenewInterval, 3600000, rmContext);
     return rmDtSecretManager;
   }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu May 30 04:14:26 2013
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -25,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -33,14 +37,18 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -58,24 +66,32 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestRMRestart {
-  
-  @Test
-  public void testRMRestart() throws Exception {
+
+  private YarnConfiguration conf;
+
+  @Before
+  public void setup() {
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
     ExitUtil.disableSystemExit();
-    
-    YarnConfiguration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
+    UserGroupInformation.setConfiguration(conf);
     conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
     conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
     conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
+  }
+
+  @Test
+  public void testRMRestart() throws Exception {
     Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -331,13 +347,6 @@ public class TestRMRestart {
   
   @Test
   public void testRMRestartOnMaxAppAttempts() throws Exception {
-    Logger rootLogger = LogManager.getRootLogger();
-    rootLogger.setLevel(Level.DEBUG);
-    ExitUtil.disableSystemExit();
-
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
-    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
     Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -411,13 +420,6 @@ public class TestRMRestart {
   @Test
   public void testDelegationTokenRestoredInDelegationTokenRenewer()
       throws Exception {
-    Logger rootLogger = LogManager.getRootLogger();
-    rootLogger.setLevel(Level.DEBUG);
-    ExitUtil.disableSystemExit();
-
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
-    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
@@ -496,13 +498,6 @@ public class TestRMRestart {
 
   @Test
   public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
-    Logger rootLogger = LogManager.getRootLogger();
-    rootLogger.setLevel(Level.DEBUG);
-    ExitUtil.disableSystemExit();
-
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
-    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
       "kerberos");
@@ -577,7 +572,142 @@ public class TestRMRestart {
     rm2.stop();
   }
 
-  class TestSecurityMockRM extends MockRM {
+  @Test
+  public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+    Map<RMDelegationTokenIdentifier, Long> rmDTState =
+        rmState.getRMDTSecretManagerState().getTokenState();
+    Set<DelegationKey> rmDTMasterKeyState =
+        rmState.getRMDTSecretManagerState().getMasterKeyState();
+
+    MockRM rm1 = new TestSecurityMockRM(conf, memStore);
+    rm1.start();
+
+    // create an empty credential
+    Credentials ts = new Credentials();
+
+    // request a token and add into credential
+    GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class);
+    when(request1.getRenewer()).thenReturn("renewer1");
+    GetDelegationTokenResponse response1 =
+        rm1.getClientRMService().getDelegationToken(request1);
+    DelegationToken delegationToken1 = response1.getRMDelegationToken();
+    Token<RMDelegationTokenIdentifier> token1 =
+        ProtoUtils.convertFromProtoFormat(delegationToken1, null);
+    RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
+
+    HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
+        new HashSet<RMDelegationTokenIdentifier>();
+    ts.addToken(token1.getService(), token1);
+    tokenIdentSet.add(dtId1);
+
+    // submit an app with customized credential
+    RMApp app = rm1.submitApp(200, "name", "user",
+        new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
+
+    // assert app info is saved
+    ApplicationState appState = rmAppState.get(app.getApplicationId());
+    Assert.assertNotNull(appState);
+
+    // assert all master keys are saved
+    Set<DelegationKey> allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys();
+    Assert.assertEquals(allKeysRM1, rmDTMasterKeyState);
+
+    // assert all tokens are saved
+    Map<RMDelegationTokenIdentifier, Long> allTokensRM1 =
+        rm1.getRMDTSecretManager().getAllTokens();
+    Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet());
+    Assert.assertEquals(allTokensRM1, rmDTState);
+
+    // assert sequence number is saved
+    Assert.assertEquals(
+      rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+      rmState.getRMDTSecretManagerState().getDTSequenceNumber());
+
+    // request one more token
+    GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class);
+    when(request2.getRenewer()).thenReturn("renewer2");
+    GetDelegationTokenResponse response2 =
+        rm1.getClientRMService().getDelegationToken(request2);
+    DelegationToken delegationToken2 = response2.getRMDelegationToken();
+    Token<RMDelegationTokenIdentifier> token2 =
+        ProtoUtils.convertFromProtoFormat(delegationToken2, null);
+    RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
+
+    // cancel token2
+    try{
+      rm1.getRMDTSecretManager().cancelToken(token2,
+        UserGroupInformation.getCurrentUser().getUserName());
+    } catch(Exception e) {
+      Assert.fail();
+    }
+
+    // Assert the token which has the latest delegationTokenSequenceNumber is removed
+    Assert.assertEquals(
+      rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+      dtId2.getSequenceNumber());
+    Assert.assertFalse(rmDTState.containsKey(dtId2));
+
+    // start new RM
+    MockRM rm2 = new TestSecurityMockRM(conf, memStore);
+    rm2.start();
+
+    // assert master keys and tokens are populated back to DTSecretManager
+    Map<RMDelegationTokenIdentifier, Long> allTokensRM2 =
+        rm2.getRMDTSecretManager().getAllTokens();
+    Assert.assertEquals(allTokensRM1, allTokensRM2);
+    // rm2 has its own master keys when it starts, we use containsAll here
+    Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
+      .containsAll(allKeysRM1));
+
+    // assert sequenceNumber is properly recovered,
+    // even though the token which has max sequenceNumber is not stored
+    Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+      rm2.getRMDTSecretManager().getLatestDTSequenceNumber());
+
+    // renewDate before renewing
+    Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
+    try{
+      // renew recovered token
+      rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
+    } catch(Exception e) {
+      Assert.fail();
+    }
+
+    allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
+    Long renewDateAfterRenew = allTokensRM2.get(dtId1);
+    // assert token is renewed
+    Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew);
+
+    // assert new token is added into state store
+    Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew));
+    // assert old token is removed from state store
+    Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew));
+
+    try{
+      rm2.getRMDTSecretManager().cancelToken(token1,
+        UserGroupInformation.getCurrentUser().getUserName());
+    } catch(Exception e) {
+      Assert.fail();
+    }
+
+    // assert token is removed from state after its cancelled
+    allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
+    Assert.assertFalse(allTokensRM2.containsKey(dtId1));
+    Assert.assertFalse(rmDTState.containsKey(dtId1));
+
+    // stop the RM
+    rm1.stop();
+    rm2.stop();
+  }
+
+  public static class TestSecurityMockRM extends MockRM {
 
     public TestSecurityMockRM(Configuration conf, RMStateStore store) {
       super(conf, store);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java Thu May 30 04:14:26 2013
@@ -31,6 +31,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import junit.framework.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -53,8 +56,10 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -111,7 +116,8 @@ public class TestRMStateStore {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     try {
       TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
-      testRMStateStore(fsTester);
+      testRMAppStateStore(fsTester);
+      testRMDTSecretManagerStateStore(fsTester);
     } finally {
       cluster.shutdown();
     }
@@ -218,7 +224,7 @@ public class TestRMStateStore {
   }
 
   @SuppressWarnings("unchecked")
-  void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
+  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
     long submitTime = System.currentTimeMillis();
     Configuration conf = new YarnConfiguration();
     RMStateStore store = stateStoreHelper.getRMStateStore();
@@ -334,6 +340,37 @@ public class TestRMStateStore {
     store.close();
   }
 
+  public void testRMDTSecretManagerStateStore(
+      RMStateStoreHelper stateStoreHelper) throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setDispatcher(dispatcher);
+
+    // store RM delegation token;
+    RMDelegationTokenIdentifier dtId1 =
+        new RMDelegationTokenIdentifier(new Text("owner1"),
+          new Text("renewer1"), new Text("realuser1"));
+    Long renewDate1 = new Long(System.currentTimeMillis());
+    int sequenceNumber = 1111;
+    store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
+      sequenceNumber);
+    Map<RMDelegationTokenIdentifier, Long> token1 =
+        new HashMap<RMDelegationTokenIdentifier, Long>();
+    token1.put(dtId1, renewDate1);
+
+    // store delegation key;
+    DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
+    HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
+    keySet.add(key);
+    store.storeRMDTMasterKey(key);
+
+    RMDTSecretManagerState secretManagerState =
+        store.loadState().getRMDTSecretManagerState();
+    Assert.assertEquals(token1, secretManagerState.getTokenState());
+    Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
+    Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
+  }
+
   private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
       ApplicationTokenSecretManager appTokenMgr,
       ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java?rev=1487720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java Thu May 30 04:14:26 2013
@@ -0,0 +1,206 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRMDelegationTokens {
+
+  private YarnConfiguration conf;
+
+  @Before
+  public void setup() {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    ExitUtil.disableSystemExit();
+    conf = new YarnConfiguration();
+    UserGroupInformation.setConfiguration(conf);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
+  }
+
+  @Test(timeout = 15000)
+  public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+
+    Map<RMDelegationTokenIdentifier, Long> rmDTState =
+        rmState.getRMDTSecretManagerState().getTokenState();
+    Set<DelegationKey> rmDTMasterKeyState =
+        rmState.getRMDTSecretManagerState().getMasterKeyState();
+
+    MockRM rm1 = new MyMockRM(conf, memStore);
+    rm1.start();
+    // on rm start, two master keys are created.
+    // One is created at RMDTSecretMgr.startThreads.updateCurrentKey();
+    // the other is created on the first run of
+    // tokenRemoverThread.rollMasterKey()
+
+    RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
+    // assert all master keys are saved
+    Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
+    Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
+    expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
+
+    // record the current key
+    DelegationKey oldCurrentKey =
+        ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
+
+    // request to generate a RMDelegationToken
+    GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
+    when(request.getRenewer()).thenReturn("renewer1");
+    GetDelegationTokenResponse response =
+        rm1.getClientRMService().getDelegationToken(request);
+    DelegationToken delegationToken = response.getRMDelegationToken();
+    Token<RMDelegationTokenIdentifier> token1 =
+        ProtoUtils.convertFromProtoFormat(delegationToken, null);
+    RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
+
+    // wait for the first rollMasterKey
+    while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
+      .get() < 1){
+      Thread.sleep(200);
+    }
+
+    // assert old-current-key and new-current-key exist
+    Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey));
+    DelegationKey newCurrentKey =
+        ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
+    Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey));
+
+    // wait for token to expire
+    // rollMasterKey is called every 1 second.
+    while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
+      .get() < 6) {
+      Thread.sleep(200);
+    }
+
+    Assert.assertFalse(rmDTState.containsKey(dtId1));
+    rm1.stop();
+  }
+
+  @Test(timeout = 15000)
+  public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+
+    Set<DelegationKey> rmDTMasterKeyState =
+        rmState.getRMDTSecretManagerState().getMasterKeyState();
+
+    MockRM rm1 = new MyMockRM(conf, memStore);
+    rm1.start();
+    RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
+
+    // assert all master keys are saved
+    Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
+    Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
+    expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
+
+    // wait for expiringKeys to expire
+    while (true) {
+      boolean allExpired = true;
+      for (DelegationKey key : expiringKeys) {
+        if (rmDTMasterKeyState.contains(key)) {
+          allExpired = false;
+        }
+      }
+      if (allExpired)
+        break;
+      Thread.sleep(500);
+    }
+  }
+
+  class MyMockRM extends TestSecurityMockRM {
+
+    public MyMockRM(Configuration conf, RMStateStore store) {
+      super(conf, store);
+    }
+
+    @Override
+    protected RMDelegationTokenSecretManager
+        createRMDelegationTokenSecretManager(RMContext rmContext) {
+      // KeyUpdateInterval-> 1 seconds
+      // TokenMaxLifetime-> 2 seconds.
+      return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
+        rmContext);
+    }
+  }
+
+  public class TestRMDelegationTokenSecretManager extends
+      RMDelegationTokenSecretManager {
+    public AtomicInteger numUpdatedKeys = new AtomicInteger(0);
+
+    public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+        long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+        long delegationTokenRemoverScanInterval, RMContext rmContext) {
+      super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+        delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
+        rmContext);
+    }
+
+    @Override
+    protected void storeNewMasterKey(DelegationKey newKey) {
+      super.storeNewMasterKey(newKey);
+      numUpdatedKeys.incrementAndGet();
+    }
+
+    public DelegationKey getCurrentKey() {
+      for (int keyId : allKeys.keySet()) {
+        if (keyId == currentId) {
+          return allKeys.get(keyId);
+        }
+      }
+      return null;
+    }
+  }
+
+}