You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/19 01:20:35 UTC

svn commit: r1494370 [3/3] - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java...

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1494370&r1=1494369&r2=1494370&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Jun 18 23:20:34 2013
@@ -380,7 +380,7 @@ public class ApplicationMasterService ex
       // Adding NMTokens for allocated containers.
       if (!allocation.getContainers().isEmpty()) {
         allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
-            .getNMTokens(app.getUser(), appAttemptId,
+            .createAndGetNMTokens(app.getUser(), appAttemptId,
                 allocation.getContainers()));
       }
       return allocateResponse;

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1494370&r1=1494369&r2=1494370&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Tue Jun 18 23:20:34 2013
@@ -131,21 +131,30 @@ public class AMLauncher implements Runna
 
     final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
 
-    UserGroupInformation currentUser = UserGroupInformation
-        .createRemoteUser(containerId.toString());
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Token<ContainerTokenIdentifier> token =
-          ConverterUtils.convertFromYarn(masterContainer
-              .getContainerToken(), containerManagerBindAddress);
-      currentUser.addToken(token);
-    }
-    return currentUser.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-      @Override
-      public ContainerManagementProtocol run() {
-        return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
-            containerManagerBindAddress, conf);
-      }
-    });
+    UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser(containerId
+            .getApplicationAttemptId().toString());
+
+    String user =
+        rmContext.getRMApps()
+            .get(containerId.getApplicationAttemptId().getApplicationId())
+            .getUser();
+    org.apache.hadoop.yarn.api.records.Token token =
+        rmContext.getNMTokenSecretManager().createNMToken(
+            containerId.getApplicationAttemptId(), node, user);
+    currentUser.addToken(ConverterUtils.convertFromYarn(token,
+        containerManagerBindAddress));
+
+    return currentUser
+        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+
+          @Override
+          public ContainerManagementProtocol run() {
+            return (ContainerManagementProtocol) rpc.getProxy(
+                ContainerManagementProtocol.class,
+                containerManagerBindAddress, conf);
+          }
+        });
   }
 
   private ContainerLaunchContext createAMContainerLaunchContext(
@@ -234,7 +243,13 @@ public class AMLauncher implements Runna
       } catch(IOException ie) {
         LOG.info("Error cleaning master ", ie);
       } catch (YarnException e) {
-        LOG.info("Error cleaning master ", e);
+        StringBuilder sb = new StringBuilder("Container ");
+        sb.append(masterContainer.getId().toString());
+        sb.append(" is not handled by this NodeManager");
+        if (!e.getMessage().contains(sb.toString())) {
+          // Ignoring if container is already killed by Node Manager.
+          LOG.info("Error cleaning master ", e);          
+        }
       }
       break;
     default:

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java?rev=1494370&r1=1494369&r2=1494370&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java Tue Jun 18 23:20:34 2013
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,16 +30,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -183,7 +178,7 @@ public class NMTokenSecretManagerInRM ex
     }
   }
   
-  public List<NMToken> getNMTokens(String applicationSubmitter,
+  public List<NMToken> createAndGetNMTokens(String applicationSubmitter,
       ApplicationAttemptId appAttemptId, List<Container> containers) {
     try {
       this.readLock.lock();
@@ -193,12 +188,14 @@ public class NMTokenSecretManagerInRM ex
         for (Container container : containers) {
           if (!nodeSet.contains(container.getNodeId())) {
             LOG.debug("Sending NMToken for nodeId : "
-                + container.getNodeId().toString());
+                + container.getNodeId().toString()
+                + " for application attempt : " + appAttemptId.toString());
             Token token = createNMToken(appAttemptId, container.getNodeId(),
                 applicationSubmitter);
             NMToken nmToken =
                 NMToken.newInstance(container.getNodeId(), token);
             nmTokens.add(nmToken);
+            // This will update the nmToken set.
             nodeSet.add(container.getNodeId());
           }
         }
@@ -273,38 +270,4 @@ public class NMTokenSecretManagerInRM ex
       this.writeLock.unlock();
     }
   }
-  
-  public static Token newNMToken(byte[] password,
-      NMTokenIdentifier identifier) {
-    NodeId nodeId = identifier.getNodeId();
-    // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr =
-        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
-    Token nmToken =
-        Token.newInstance(identifier.getBytes(),
-          NMTokenIdentifier.KIND.toString(), password, SecurityUtil
-            .buildTokenService(addr).toString());
-    return nmToken;
-  }
-  
-  /**
-   * Helper function for creating NMTokens.
-   */
-  public Token createNMToken(ApplicationAttemptId applicationAttemptId,
-      NodeId nodeId, String applicationSubmitter) {
-    byte[] password;
-    NMTokenIdentifier identifier;
-    
-    this.readLock.lock();
-    try {
-      identifier =
-          new NMTokenIdentifier(applicationAttemptId, nodeId,
-              applicationSubmitter, this.currentMasterKey.getMasterKey()
-                  .getKeyId());
-      password = this.createPassword(identifier);
-    } finally {
-      this.readLock.unlock();
-    }
-    return newNMToken(password, identifier);
-  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1494370&r1=1494369&r2=1494370&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Jun 18 23:20:34 2013
@@ -59,9 +59,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -310,6 +308,7 @@ public class MockRM extends ResourceMana
     Configuration conf = new Configuration();
     
     containerTokenSecretManager.rollMasterKey();
+    nmTokenSecretManager.rollMasterKey();
     return new ResourceTrackerService(getRMContext(), nodesListManager,
         this.nmLivelinessMonitor, containerTokenSecretManager,
         nmTokenSecretManager) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java?rev=1494370&r1=1494369&r2=1494370&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java Tue Jun 18 23:20:34 2013
@@ -22,14 +22,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 
 import junit.framework.Assert;
 
@@ -37,45 +30,30 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Test;
 
@@ -111,15 +89,15 @@ public class TestContainerManagerSecurit
       yarnCluster.init(conf);
       yarnCluster.start();
       
-      // Testing for authenticated user
-      testAuthenticatedUser();
+      // TestNMTokens.
+      testNMTokens(conf);
       
-      // Testing for malicious user
-      testMaliceUser();
-      
-      // Testing for usage of expired tokens
-      testExpiredTokens();
+      // Testing for container token tampering
+      testContainerToken(conf);
       
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
     } finally {
       if (yarnCluster != null) {
         yarnCluster.stop();
@@ -128,57 +106,264 @@ public class TestContainerManagerSecurit
     }
   }
   
-  private void testAuthenticatedUser() throws IOException,
-      InterruptedException, YarnException {
+  private void testNMTokens(Configuration conf) throws Exception {
+    NMTokenSecretManagerInRM nmTokenSecretManagerRM =
+        yarnCluster.getResourceManager().getRMContext()
+          .getNMTokenSecretManager();
+    NMTokenSecretManagerInNM nmTokenSecretManagerNM =
+        yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager();
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        yarnCluster.getResourceManager().getRMContainerTokenSecretManager();
+    
+    NodeManager nm = yarnCluster.getNodeManager(0);
+    
+    waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM, nm);
+    
+    // Both id should be equal.
+    Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(),
+        nmTokenSecretManagerRM.getCurrentKey().getKeyId());
+    
+    /*
+     * Below cases should be tested.
+     * 1) If Invalid NMToken is used then it should be rejected.
+     * 2) If valid NMToken but belonging to another Node is used then that
+     * too should be rejected.
+     * 3) NMToken for say appAttempt-1 is used for starting/stopping/retrieving
+     * status for container with containerId for say appAttempt-2 should
+     * be rejected.
+     * 4) After start container call is successful nmtoken should have been
+     * saved in NMTokenSecretManagerInNM.
+     * 5) If start container call was successful (no matter if container is
+     * still running or not), appAttempt->NMToken should be present in
+     * NMTokenSecretManagerInNM's cache. Any future getContainerStatus call
+     * for containerId belonging to that application attempt using
+     * applicationAttempt's older nmToken should not get any invalid
+     * nmToken error. (This can be best tested if we roll over NMToken
+     * master key twice).
+     */
+    YarnRPC rpc = YarnRPC.create(conf);
+    String user = "test";
+    Resource r = Resource.newInstance(1024, 1);
+
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId validAppAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ApplicationAttemptId invalidAppAttemptId =
+        ApplicationAttemptId.newInstance(appId, 2);
+    
+    ContainerId validContainerId =
+        ContainerId.newInstance(validAppAttemptId, 0);
+    
+    NodeId validNode = yarnCluster.getNodeManager(0).getNMContext().getNodeId();
+    NodeId invalidNode = NodeId.newInstance("InvalidHost", 1234);
 
-    LOG.info("Running test for authenticated user");
+    
+    org.apache.hadoop.yarn.api.records.Token validNMToken =
+        nmTokenSecretManagerRM.createNMToken(validAppAttemptId, validNode, user);
+    
+    org.apache.hadoop.yarn.api.records.Token validContainerToken =
+        containerTokenSecretManager.createContainerToken(validContainerId,
+            validNode, user, r);
+    
+    StringBuilder sb;
+    // testInvalidNMToken ... creating NMToken using different secret manager.
+    
+    NMTokenSecretManagerInRM tempManager = new NMTokenSecretManagerInRM(conf);
+    tempManager.rollMasterKey();
+    do {
+      tempManager.rollMasterKey();
+      tempManager.activateNextMasterKey();
+      // Making sure key id is different.
+    } while (tempManager.getCurrentKey().getKeyId() == nmTokenSecretManagerRM
+        .getCurrentKey().getKeyId());
+    
+    org.apache.hadoop.yarn.api.records.Token invalidNMToken =
+        tempManager.createNMToken(validAppAttemptId, validNode, user);
+    sb = new StringBuilder("Given NMToken for application : ");
+    sb.append(validAppAttemptId.toString())
+      .append(" seems to have been generated illegally.");
+    Assert.assertTrue(sb.toString().contains(
+        testStartContainer(rpc, validAppAttemptId, validNode,
+            validContainerToken, invalidNMToken, true)));
+    
+    // valid NMToken but belonging to other node
+    invalidNMToken =
+        nmTokenSecretManagerRM.createNMToken(validAppAttemptId, invalidNode,
+            user);
+    sb = new StringBuilder("Given NMToken for application : ");
+    sb.append(validAppAttemptId)
+      .append(" is not valid for current node manager.expected : ")
+      .append(validNode.toString())
+      .append(" found : ").append(invalidNode.toString());
+    Assert.assertTrue(sb.toString().contains(
+        testStartContainer(rpc, validAppAttemptId, validNode,
+            validContainerToken, invalidNMToken, true)));
+    
+    // using appAttempt-2 token for launching container for appAttempt-1.
+    invalidNMToken =
+        nmTokenSecretManagerRM.createNMToken(invalidAppAttemptId, validNode,
+            user);
+    sb = new StringBuilder("\nNMToken for application attempt : ");
+    sb.append(invalidAppAttemptId.toString())
+      .append(" was used for starting container with container token")
+      .append(" issued for application attempt : ")
+      .append(validAppAttemptId.toString());
+    Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
+        validContainerToken, invalidNMToken, true).contains(sb.toString()));
+    
+    // using correct tokens. nmtoken for appattempt should get saved.
+    testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken,
+        validNMToken, false);
+    Assert.assertTrue(nmTokenSecretManagerNM
+        .isAppAttemptNMTokenKeyPresent(validAppAttemptId));
+    
+    // Rolling over master key twice so that we can check whether older keys
+    // are used for authentication.
+    rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
+    // Key rolled over once.. rolling over again
+    rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
+    
+    // trying get container status. Now saved nmToken should be used for
+    // authentication.
+    sb = new StringBuilder("Container ");
+    sb.append(validContainerId.toString());
+    sb.append(" is not handled by this NodeManager");
+    Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
+        validContainerId, validNMToken, false).contains(sb.toString()));
+    
+  }
 
-    ResourceManager resourceManager = yarnCluster.getResourceManager();
+  protected void waitForNMToReceiveNMTokenKey(
+      NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm)
+      throws InterruptedException {
+    int attempt = 60;
+    ContainerManagerImpl cm =
+        ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
+    while ((cm.getBlockNewContainerRequestsStatus() || nmTokenSecretManagerNM
+        .getNodeId() == null) && attempt-- > 0) {
+      Thread.sleep(2000);
+    }
+  }
 
-    final YarnRPC yarnRPC = YarnRPC.create(conf);
+  protected void rollNMTokenMasterKey(
+      NMTokenSecretManagerInRM nmTokenSecretManagerRM,
+      NMTokenSecretManagerInNM nmTokenSecretManagerNM) throws Exception {
+    int oldKeyId = nmTokenSecretManagerRM.getCurrentKey().getKeyId();
+    nmTokenSecretManagerRM.rollMasterKey();
+    int interval = 40;
+    while (nmTokenSecretManagerNM.getCurrentKey().getKeyId() == oldKeyId
+        && interval-- > 0) {
+      Thread.sleep(1000);
+    }
+    nmTokenSecretManagerRM.activateNextMasterKey();
+    Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId()
+        == nmTokenSecretManagerRM.getCurrentKey().getKeyId()));
+  }
 
-    // Submit an application
-    ApplicationId appID = resourceManager.getClientRMService()
-        .getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
-        .getApplicationId();
-    ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager,
-        yarnRPC, appID);
-
-    // Now request a container.
-    final Container allocatedContainer = requestAndGetContainer(scheduler,
-        appID);
-
-    // Now talk to the NM for launching the container.
-    final ContainerId containerID = allocatedContainer.getId();
-    UserGroupInformation authenticatedUser = UserGroupInformation
-        .createRemoteUser(containerID.toString());
-    org.apache.hadoop.yarn.api.records.Token containerToken =
-        allocatedContainer.getContainerToken();
-    Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
-        containerToken.getIdentifier().array(), containerToken.getPassword()
-            .array(), new Text(containerToken.getKind()), new Text(
-            containerToken.getService()));
-    authenticatedUser.addToken(token);
-    authenticatedUser.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy(
-            ContainerManagementProtocol.class, NetUtils
-                .createSocketAddr(allocatedContainer.getNodeId().toString()),
-            conf);
-        LOG.info("Going to make a legal stopContainer() request");
-        StopContainerRequest request = recordFactory
-            .newRecordInstance(StopContainerRequest.class);
-        request.setContainerId(containerID);
-        client.stopContainer(request);
-        return null;
+  private String testGetContainer(YarnRPC rpc,
+      ApplicationAttemptId appAttemptId, NodeId nodeId,
+      ContainerId containerId,
+      org.apache.hadoop.yarn.api.records.Token nmToken,
+      boolean isExceptionExpected) {
+    try {
+      getContainerStatus(rpc, nmToken, containerId, appAttemptId, nodeId,
+          isExceptionExpected);
+      if (isExceptionExpected) {
+        fail("Exception was expected!!");
       }
-    });
+      return "";
+    } catch (Exception e) {
+      e.printStackTrace();
+      return e.getMessage();
+    }
+  }
 
-    KillApplicationRequest request = Records
-        .newRecord(KillApplicationRequest.class);
-    request.setApplicationId(appID);
-    resourceManager.getClientRMService().forceKillApplication(request);
+  protected String testStartContainer(YarnRPC rpc,
+      ApplicationAttemptId appAttemptId, NodeId nodeId,
+      org.apache.hadoop.yarn.api.records.Token containerToken,
+      org.apache.hadoop.yarn.api.records.Token nmToken,
+      boolean isExceptionExpected) {
+    try {
+      startContainer(rpc, nmToken, containerToken, nodeId,
+          appAttemptId.toString());
+      if (isExceptionExpected){
+        fail("Exception was expected!!");        
+      }
+      return "";
+    } catch (Exception e) {
+      e.printStackTrace();
+      return e.getMessage();
+    }
+  }
+  
+  private void
+      getContainerStatus(YarnRPC rpc,
+          org.apache.hadoop.yarn.api.records.Token nmToken,
+          ContainerId containerId,
+          ApplicationAttemptId appAttemptId, NodeId nodeId,
+          boolean isExceptionExpected) throws Exception {
+    GetContainerStatusRequest request =
+        Records.newRecord(GetContainerStatusRequest.class);
+    request.setContainerId(containerId);
+    
+    ContainerManagementProtocol proxy = null;
+    
+    try {
+      proxy =
+          getContainerManagementProtocolProxy(rpc, nmToken, nodeId,
+              appAttemptId.toString());
+      proxy.getContainerStatus(request);
+      
+    } finally {
+      if (proxy != null) {
+        rpc.stopProxy(proxy, conf);
+      }
+    }
+  }
+  
+  private void startContainer(final YarnRPC rpc,
+      org.apache.hadoop.yarn.api.records.Token nmToken,
+      org.apache.hadoop.yarn.api.records.Token containerToken,
+      NodeId nodeId, String user) throws Exception {
+
+    StartContainerRequest request =
+        Records.newRecord(StartContainerRequest.class);
+    request.setContainerToken(containerToken);
+    ContainerLaunchContext context =
+        Records.newRecord(ContainerLaunchContext.class);
+    request.setContainerLaunchContext(context);
+
+    ContainerManagementProtocol proxy = null;
+    try {
+      proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
+      proxy.startContainer(request);
+    } finally {
+      if (proxy != null) {
+        rpc.stopProxy(proxy, conf);
+      }
+    }
+  }
+
+  protected ContainerManagementProtocol getContainerManagementProtocolProxy(
+      final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken,
+      NodeId nodeId, String user) {
+    ContainerManagementProtocol proxy;
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    final InetSocketAddress addr =
+        NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
+    ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
+
+    proxy = ugi
+        .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+
+          @Override
+          public ContainerManagementProtocol run() {
+            return (ContainerManagementProtocol) rpc.getProxy(
+                ContainerManagementProtocol.class,
+                addr, conf);
+          }
+        });
+    return proxy;
   }
 
   /**
@@ -190,349 +375,61 @@ public class TestContainerManagerSecurit
    * @throws InterruptedException
    * @throws YarnException
    */
-  private void testMaliceUser() throws IOException, InterruptedException,
-      YarnException {
+  private void testContainerToken(Configuration conf) throws IOException,
+      InterruptedException, YarnException {
 
     LOG.info("Running test for malice user");
-
-    ResourceManager resourceManager = yarnCluster.getResourceManager();
-
-    final YarnRPC yarnRPC = YarnRPC.create(conf);
-
-    // Submit an application
-    ApplicationId appID = resourceManager.getClientRMService()
-        .getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
-        .getApplicationId();
-    ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager,
-        yarnRPC, appID);
-
-    // Now request a container.
-    final Container allocatedContainer = requestAndGetContainer(scheduler,
-        appID);
-
-    // Now talk to the NM for launching the container with modified resource
-
-    org.apache.hadoop.yarn.api.records.Token containerToken =
-        allocatedContainer.getContainerToken();
-    ContainerTokenIdentifier originalContainerTokenId =
-        BuilderUtils.newContainerTokenIdentifier(containerToken);
-
-    // Malice user modifies the resource amount
-    Resource modifiedResource = BuilderUtils.newResource(2048, 1);
-    ContainerTokenIdentifier modifiedIdentifier =
-        new ContainerTokenIdentifier(originalContainerTokenId.getContainerID(),
-          originalContainerTokenId.getNmHostAddress(), "testUser",
-          modifiedResource, Long.MAX_VALUE,
-          originalContainerTokenId.getMasterKeyId(),
-          ResourceManager.clusterTimeStamp);
-    Token<ContainerTokenIdentifier> modifiedToken =
-        new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(),
-          containerToken.getPassword().array(), new Text(
-            containerToken.getKind()), new Text(containerToken.getService()));
-    makeTamperedStartContainerCall(yarnRPC, allocatedContainer,
-      modifiedIdentifier, modifiedToken);
-
-    // Malice user modifies the container-Id
-    ContainerId newContainerId =
-        BuilderUtils.newContainerId(
-          BuilderUtils.newApplicationAttemptId(originalContainerTokenId
-            .getContainerID().getApplicationAttemptId().getApplicationId(), 1),
-          originalContainerTokenId.getContainerID().getId() + 42);
-    modifiedIdentifier =
-        new ContainerTokenIdentifier(newContainerId,
-          originalContainerTokenId.getNmHostAddress(), "testUser",
-          originalContainerTokenId.getResource(), Long.MAX_VALUE,
-          originalContainerTokenId.getMasterKeyId(),
-          ResourceManager.clusterTimeStamp);
-    modifiedToken =
-        new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(),
-          containerToken.getPassword().array(), new Text(
-            containerToken.getKind()), new Text(containerToken.getService()));
-    makeTamperedStartContainerCall(yarnRPC, allocatedContainer,
-      modifiedIdentifier, modifiedToken);
-
-    // Similarly messing with anything else will fail.
-
-    KillApplicationRequest request = Records
-        .newRecord(KillApplicationRequest.class);
-    request.setApplicationId(appID);
-    resourceManager.getClientRMService().forceKillApplication(request);
-  }
-
-  private void makeTamperedStartContainerCall(final YarnRPC yarnRPC,
-      final Container allocatedContainer,
-      final ContainerTokenIdentifier modifiedIdentifier,
-      Token<ContainerTokenIdentifier> modifiedToken) {
-    final ContainerId containerID = allocatedContainer.getId();
-    UserGroupInformation maliceUser = UserGroupInformation
-        .createRemoteUser(containerID.toString());
-    maliceUser.addToken(modifiedToken);
-    maliceUser.doAs(new PrivilegedAction<Void>() {
-      @Override
-      public Void run() {
-        ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy(
-            ContainerManagementProtocol.class, NetUtils
-                .createSocketAddr(allocatedContainer.getNodeId().toString()),
-            conf);
-
-        LOG.info("Going to contact NM:  ilLegal request");
-        StartContainerRequest request =
-            Records.newRecord(StartContainerRequest.class);
-        try {
-          request.setContainerToken(allocatedContainer.getContainerToken());
-          ContainerLaunchContext context =
-              createContainerLaunchContextForTest(modifiedIdentifier);
-          request.setContainerLaunchContext(context);
-          client.startContainer(request);
-          fail("Connection initiation with illegally modified "
-              + "tokens is expected to fail.");
-        } catch (YarnException e) {
-          LOG.error("Got exception", e);
-          fail("Cannot get a YARN remote exception as "
-              + "it will indicate RPC success");
-        } catch (Exception e) {
-          Assert.assertEquals(
-              javax.security.sasl.SaslException.class
-              .getCanonicalName(), e.getClass().getCanonicalName());
-          Assert.assertTrue(e
-            .getMessage()
-            .contains(
-              "DIGEST-MD5: digest response format violation. "
-                  + "Mismatched response."));
-        }
-        return null;
-      }
-    });
-  }
-
-  private void testExpiredTokens() throws IOException, InterruptedException,
-      YarnException {
-
-    LOG.info("\n\nRunning test for malice user");
-
-    ResourceManager resourceManager = yarnCluster.getResourceManager();
-
-    final YarnRPC yarnRPC = YarnRPC.create(conf);
-
-    // Submit an application
-    final ApplicationId appID = resourceManager.getClientRMService()
-        .getNewApplication(Records.newRecord(GetNewApplicationRequest.class))
-        .getApplicationId();
-    ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager,
-        yarnRPC, appID);
-
-    // Now request a container.
-    final Container allocatedContainer = requestAndGetContainer(scheduler,
-        appID);
-
-    // Now talk to the NM for launching the container with modified containerID
-    final ContainerId containerID = allocatedContainer.getId();
-
-    org.apache.hadoop.yarn.api.records.Token containerToken =
-        allocatedContainer.getContainerToken();
-    final ContainerTokenIdentifier tokenId =
-        BuilderUtils.newContainerTokenIdentifier(containerToken);
-
-    /////////// Test calls with expired tokens
-    UserGroupInformation unauthorizedUser = UserGroupInformation
-        .createRemoteUser(containerID.toString());
-
-    RMContainerTokenSecretManager containerTokenSecreteManager = 
-      resourceManager.getRMContainerTokenSecretManager(); 
-    final ContainerTokenIdentifier newTokenId =
-        new ContainerTokenIdentifier(tokenId.getContainerID(),
-          tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(),
-          tokenId.getResource(), System.currentTimeMillis() - 1,
-          containerTokenSecreteManager.getCurrentKey().getKeyId(),
-          ResourceManager.clusterTimeStamp);
-    final byte[] passowrd =
-        containerTokenSecreteManager.createPassword(
-            newTokenId);
-    // Create a valid token by using the key from the RM.
-    Token<ContainerTokenIdentifier> token =
-        new Token<ContainerTokenIdentifier>(newTokenId.getBytes(), passowrd,
-          new Text(containerToken.getKind()), new Text(
-            containerToken.getService()));
-
-    unauthorizedUser.addToken(token);
-    unauthorizedUser.doAs(new PrivilegedAction<Void>() {
-      @Override
-      public Void run() {
-        ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy(
-            ContainerManagementProtocol.class, NetUtils
-                .createSocketAddr(allocatedContainer.getNodeId().toString()),
-            conf);
-
-        LOG.info("Going to contact NM with expired token");
-        ContainerLaunchContext context = createContainerLaunchContextForTest(newTokenId);
-        StartContainerRequest request =
-            Records.newRecord(StartContainerRequest.class);
-        request.setContainerLaunchContext(context);
-        allocatedContainer.setContainerToken(BuilderUtils.newContainerToken(
-            allocatedContainer.getNodeId(), passowrd, newTokenId));
-        request.setContainerToken(allocatedContainer.getContainerToken());
-
-        //Calling startContainer with an expired token.
-        try {
-          client.startContainer(request);
-          fail("Connection initiation with expired "
-              + "token is expected to fail.");
-        } catch (Throwable t) {
-          LOG.info("Got exception : ", t);
-          Assert.assertTrue(t.getMessage().contains(
-                  "This token is expired. current time is"));
-        }
-
-        // Try stopping a container - should not get an expiry error.
-        StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class);
-        stopRequest.setContainerId(newTokenId.getContainerID());
-        try {
-          client.stopContainer(stopRequest);
-        } catch (Throwable t) {
-          fail("Stop Container call should have succeeded");
-        }
-        
-        return null;
-      }
-    });
-    /////////// End of testing calls with expired tokens
-
-    KillApplicationRequest request = Records
-        .newRecord(KillApplicationRequest.class);
-    request.setApplicationId(appID);
-    resourceManager.getClientRMService().forceKillApplication(request);
-  }
-  
-  private ApplicationMasterProtocol submitAndRegisterApplication(
-      ResourceManager resourceManager, final YarnRPC yarnRPC,
-      ApplicationId appID) throws IOException,
-      UnsupportedFileSystemException, YarnException,
-      InterruptedException {
-
-    // Use ping to simulate sleep on Windows.
-    List<String> cmd = Shell.WINDOWS ?
-      Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul") :
-      Arrays.asList("sleep", "100");
-
-    ContainerLaunchContext amContainer =
-        BuilderUtils.newContainerLaunchContext(
-            Collections.<String, LocalResource> emptyMap(),
-            new HashMap<String, String>(), cmd,
-            new HashMap<String, ByteBuffer>(), null,
-            new HashMap<ApplicationAccessType, String>());
-
-    ApplicationSubmissionContext appSubmissionContext = recordFactory
-        .newRecordInstance(ApplicationSubmissionContext.class);
-    appSubmissionContext.setApplicationId(appID);
-    appSubmissionContext.setAMContainerSpec(amContainer);
-    appSubmissionContext.setResource(BuilderUtils.newResource(1024, 1));
-
-    SubmitApplicationRequest submitRequest = recordFactory
-        .newRecordInstance(SubmitApplicationRequest.class);
-    submitRequest.setApplicationSubmissionContext(appSubmissionContext);
-    resourceManager.getClientRMService().submitApplication(submitRequest);
-
-    // Wait till container gets allocated for AM
-    int waitCounter = 0;
-    RMApp app = resourceManager.getRMContext().getRMApps().get(appID);
-    RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt();
-    RMAppAttemptState state = appAttempt == null ? null : appAttempt
-        .getAppAttemptState();
-    while ((app == null || appAttempt == null || state == null || !state
-        .equals(RMAppAttemptState.LAUNCHED))
-        && waitCounter++ != 20) {
-      LOG.info("Waiting for applicationAttempt to be created.. ");
-      Thread.sleep(1000);
-      app = resourceManager.getRMContext().getRMApps().get(appID);
-      appAttempt = app == null ? null : app.getCurrentAppAttempt();
-      state = appAttempt == null ? null : appAttempt.getAppAttemptState();
-    }
-    Assert.assertNotNull(app);
-    Assert.assertNotNull(appAttempt);
-    Assert.assertNotNull(state);
-    Assert.assertEquals(RMAppAttemptState.LAUNCHED, state);
-
-    UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(
-                                       appAttempt.getAppAttemptId().toString());
-
-    // Ask for a container from the RM
-    final InetSocketAddress schedulerAddr =
-        resourceManager.getApplicationMasterService().getBindAddress();
-    if (UserGroupInformation.isSecurityEnabled()) {
-      AMRMTokenIdentifier appTokenIdentifier = new AMRMTokenIdentifier(
-          appAttempt.getAppAttemptId());
-      AMRMTokenSecretManager appTokenSecretManager =
-          new AMRMTokenSecretManager(conf);
-      appTokenSecretManager.setMasterKey(resourceManager
-        .getAMRMTokenSecretManager().getMasterKey());
-      Token<AMRMTokenIdentifier> appToken =
-          new Token<AMRMTokenIdentifier>(appTokenIdentifier,
-            appTokenSecretManager);
-      SecurityUtil.setTokenService(appToken, schedulerAddr);
-      currentUser.addToken(appToken);
-    }
+    /*
+     * We need to check for containerToken (authorization).
+     * Here we will be assuming that we have valid NMToken  
+     * 1) ContainerToken used is expired.
+     * 2) ContainerToken is tampered (resource is modified).
+     */
+    NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
+        yarnCluster.getResourceManager().getRMContext()
+          .getNMTokenSecretManager();
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 0);
+    ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
+    NodeManager nm = yarnCluster.getNodeManager(0);
+    NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
+        nm.getNMContext().getNMTokenSecretManager();
+    String user = "test";
     
-    ApplicationMasterProtocol scheduler = currentUser
-        .doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
-          @Override
-          public ApplicationMasterProtocol run() {
-            return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class,
-                schedulerAddr, conf);
-          }
-        });
+    waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
 
-    // Register the appMaster
-    RegisterApplicationMasterRequest request = recordFactory
-        .newRecordInstance(RegisterApplicationMasterRequest.class);
-    request.setApplicationAttemptId(resourceManager.getRMContext()
-        .getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId());
-    scheduler.registerApplicationMaster(request);
-    return scheduler;
-  }
-
-  private Container requestAndGetContainer(ApplicationMasterProtocol scheduler,
-      ApplicationId appID) throws YarnException, InterruptedException,
-      IOException {
-
-    // Request a container allocation.
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ask.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
-        ResourceRequest.ANY, BuilderUtils.newResource(1024, 1), 1));
-
-    AllocateRequest allocateRequest = AllocateRequest.newInstance(
-        BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,
-        new ArrayList<ContainerId>(), null);
-    List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
-        .getAllocatedContainers();
-
-    // Modify ask to request no more.
-    allocateRequest.setAskList(new ArrayList<ResourceRequest>());
-
-    int waitCounter = 0;
-    while ((allocatedContainers == null || allocatedContainers.size() == 0)
-        && waitCounter++ != 20) {
-      LOG.info("Waiting for container to be allocated..");
-      Thread.sleep(1000);
-      allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
-      allocatedContainers = scheduler.allocate(allocateRequest)
-          .getAllocatedContainers();
-    }
-
-    Assert.assertNotNull("Container is not allocted!", allocatedContainers);
-    Assert.assertEquals("Didn't get one container!", 1, allocatedContainers
-        .size());
-
-    return allocatedContainers.get(0);
-  }
-
-  private ContainerLaunchContext createContainerLaunchContextForTest(
-      ContainerTokenIdentifier tokenId) {
-    ContainerLaunchContext context =
-        BuilderUtils.newContainerLaunchContext(
-            new HashMap<String, LocalResource>(),
-            new HashMap<String, String>(), new ArrayList<String>(),
-            new HashMap<String, ByteBuffer>(), null,
-            new HashMap<ApplicationAccessType, String>());
-    return context;
+    NodeId nodeId = nm.getNMContext().getNodeId();
+    
+    // Both id should be equal.
+    Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
+        nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
+    
+    // Creating a tampered Container Token
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        yarnCluster.getResourceManager().getRMContainerTokenSecretManager();
+    
+    RMContainerTokenSecretManager tamperedContainerTokenSecretManager =
+        new RMContainerTokenSecretManager(conf);
+    tamperedContainerTokenSecretManager.rollMasterKey();
+    do {
+      tamperedContainerTokenSecretManager.rollMasterKey();
+      tamperedContainerTokenSecretManager.activateNextMasterKey();
+    } while (containerTokenSecretManager.getCurrentKey().getKeyId()
+        == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId());
+    
+    Resource r = Resource.newInstance(1230, 2);
+    // Creating modified containerToken
+    Token containerToken =
+        tamperedContainerTokenSecretManager.createContainerToken(cId, nodeId,
+            user, r);
+    Token nmToken =
+        nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
+    YarnRPC rpc = YarnRPC.create(conf);
+    StringBuilder sb = new StringBuilder("Given Container ");
+    sb.append(cId);
+    sb.append(" seems to have an illegally generated token.");
+    Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
+        containerToken, nmToken, true).contains(sb.toString()));
   }
 }