You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/11/19 18:26:26 UTC

svn commit: r1543510 [2/2] - in /hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/ya...

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Tue Nov 19 17:26:23 2013
@@ -201,7 +201,8 @@ public class FairSchedulerPage extends R
         _("$(function() {",
           "  $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
           "  $('#cs').bind('loaded.jstree', function (e, data) {",
-          "    data.inst.open_node('#pq', true);",
+          "    var callback = { call:reopenQueryNodes }",
+          "    data.inst.open_node('#pq', callback);",
           "   }).",
           "    jstree({",
           "    core: { animation: 188, html_titles: true },",
@@ -217,7 +218,8 @@ public class FairSchedulerPage extends R
           "    $('#apps').dataTable().fnFilter(q, 3, true);",
           "  });",
           "  $('#cs').show();",
-          "});")._();
+          "});")._().
+        _(SchedulerPageUtil.QueueBlockUtil.class);
   }
   
   @Override protected Class<? extends SubView> content() {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Tue Nov 19 17:26:23 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -98,21 +100,27 @@ public class MockNM {
   }
 
   public RegisterNodeManagerResponse registerNode() throws Exception {
+    return registerNode(null);
+  }
+
+  public RegisterNodeManagerResponse registerNode(
+      List<ContainerStatus> containerStatus) throws Exception{
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
     req.setHttpPort(httpPort);
     Resource resource = BuilderUtils.newResource(memory, vCores);
     req.setResource(resource);
+    req.setContainerStatuses(containerStatus);
     req.setNMVersion(version);
     RegisterNodeManagerResponse registrationResponse =
         resourceTracker.registerNodeManager(req);
     this.currentContainerTokenMasterKey =
         registrationResponse.getContainerTokenMasterKey();
     this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
-    return registrationResponse;
+    return registrationResponse;    
   }
-
+  
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
     return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
         isHealthy, ++responseId);

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Nov 19 17:26:23 2013
@@ -421,6 +421,10 @@ public class MockRM extends ResourceMana
     return this.clientToAMSecretManager;
   }
 
+  public RMAppManager getRMAppManager() {
+    return this.rmAppManager;
+  }
+
   @Override
   protected void startWepApp() {
     // override to disable webapp

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Tue Nov 19 17:26:23 2013
@@ -172,7 +172,7 @@ public class TestAppManager{
         ApplicationSubmissionContext submissionContext, String user)
             throws YarnException {
       super.submitApplication(submissionContext, System.currentTimeMillis(),
-          false, user);
+          user, false, null);
     }
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Nov 19 17:26:23 2013
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -29,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -57,10 +63,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -83,6 +93,7 @@ import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mortbay.log.Log;
 
 public class TestRMRestart {
 
@@ -104,6 +115,7 @@ public class TestRMRestart {
     Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
   }
 
+  @SuppressWarnings("rawtypes")
   @Test (timeout=180000)
   public void testRMRestart() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@@ -252,11 +264,14 @@ public class TestRMRestart {
         .getApplicationId());
     
     // verify state machine kicked into expected states
-    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
     rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
     
-    // verify new attempts created
-    Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
+    // verify attempts for apps
+    // The app for which AM was started will wait for previous am
+    // container finish event to arrive. However for an application for which
+    // no am container was running will start new application attempt.
+    Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
     Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
     
     // verify old AM is not accepted
@@ -274,8 +289,20 @@ public class TestRMRestart {
     Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
     
     // new NM to represent NM re-register
-    nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
-    nm2 = rm2.registerNode("127.0.0.2:5678", 15120);
+    nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+    nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
+
+    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+    ContainerStatus containerStatus =
+        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
+            .getCurrentAppAttempt().getAppAttemptId(), 1),
+            ContainerState.COMPLETE, "Killed AM container", 143);
+    containerStatuses.add(containerStatus);
+    nm1.registerNode(containerStatuses);
+    nm2.registerNode();
+    
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(2, loadedApp1.getAppAttempts().size());    
 
     // verify no more reboot response sent
     hbResponse = nm1.nodeHeartbeat(true);
@@ -399,6 +426,157 @@ public class TestRMRestart {
   }
 
   @Test
+  public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
+    // testing 3 cases
+    // After RM restarts
+    // 1) New application attempt is not started until previous AM container
+    // finish event is reported back to RM as a part of nm registration.
+    // 2) If previous AM container finish event is never reported back (i.e.
+    // node manager on which this AM container was running also went down) in
+    // that case AMLivenessMonitor should time out previous attempt and start
+    // new attempt.
+    // 3) If all the stored attempts had finished then new attempt should
+    // be started immediately.
+    YarnConfiguration conf = new YarnConfiguration(this.conf);
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        rmState.getApplicationState();
+    
+    // start RM
+    final MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
+    nm1.registerNode();
+     
+    // submitting app
+    RMApp app1 = rm1.submitApp(200);
+    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    MockAM am1 = launchAM(app1, rm1, nm1);
+    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    // Fail first AM.
+    am1.waitForState(RMAppAttemptState.FAILED);
+    
+    // launch another AM.
+    MockAM am2 = launchAM(app1, rm1, nm1);
+    
+    Assert.assertEquals(1, rmAppState.size());
+    Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
+    Assert.assertEquals(app1.getAppAttempts()
+        .get(app1.getCurrentAppAttempt().getAppAttemptId())
+        .getAppAttemptState(), RMAppAttemptState.RUNNING);
+
+    //  start new RM.
+    MockRM rm2 = null;
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    NodeHeartbeatResponse res = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
+    
+    RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    // application should be in running state
+    rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    
+    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+    // new attempt should not be started
+    Assert.assertEquals(2, rmApp.getAppAttempts().size());
+    // am1 attempt should be in FAILED state where as am2 attempt should be in
+    // LAUNCHED state
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+            .getAppAttemptState());
+    Assert.assertEquals(RMAppAttemptState.LAUNCHED,
+        rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+            .getAppAttemptState());
+    
+    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+    ContainerStatus containerStatus =
+        BuilderUtils.newContainerStatus(
+            BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
+            ContainerState.COMPLETE, "Killed AM container", 143);
+    containerStatuses.add(containerStatus);
+    nm1.registerNode(containerStatuses);
+    rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    launchAM(rmApp, rm2, nm1);
+    Assert.assertEquals(3, rmApp.getAppAttempts().size());
+    rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.RUNNING);
+    // Now restart RM ...
+    // Setting AMLivelinessMonitor interval to be 10 Secs. 
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+    MockRM rm3 = null;
+    rm3 = new MockRM(conf, memStore);
+    rm3.start();
+    
+    // Wait for RM to process all the events as a part of rm recovery.
+    nm1.setResourceTrackerService(rm3.getResourceTrackerService());
+    
+    rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
+    // application should be in running state
+    rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
+    // new attempt should not be started
+    Assert.assertEquals(3, rmApp.getAppAttempts().size());
+    // am1 and am2 attempts should be in FAILED state where as am3 should be
+    // in LAUNCHED state
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+            .getAppAttemptState());
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+            .getAppAttemptState());
+    ApplicationAttemptId latestAppAttemptId =
+        rmApp.getCurrentAppAttempt().getAppAttemptId();
+    Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
+        .get(latestAppAttemptId).getAppAttemptState());
+    
+    rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
+    rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(4, rmApp.getAppAttempts().size());
+    Assert.assertEquals(RMAppAttemptState.FAILED,
+        rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
+    
+    latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId();
+    
+    // The 4th attempt has started but is not yet saved into RMStateStore
+    // It will be saved only when we launch AM.
+
+    // submitting app but not starting AM for it.
+    RMApp app2 = rm3.submitApp(200);
+    rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(1, app2.getAppAttempts().size());
+    Assert.assertEquals(0,
+        memStore.getState().getApplicationState().get(app2.getApplicationId())
+            .getAttemptCount());
+
+    MockRM rm4 = null;
+    rm4 = new MockRM(conf, memStore);
+    rm4.start();
+    
+    rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
+    rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(4, rmApp.getAppAttempts().size());
+    Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
+    Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts()
+        .get(latestAppAttemptId).getAppAttemptState());
+    
+    // The initial application for which an AM was not started should be in
+    // ACCEPTED state with one application attempt started.
+    app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId());
+    rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(RMAppState.ACCEPTED, app2.getState());
+    Assert.assertEquals(1, app2.getAppAttempts().size());
+    Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
+        .getCurrentAppAttempt().getAppAttemptState());
+
+  }
+
+  @Test
   public void testRMRestartFailedApp() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     MemoryRMStateStore memStore = new MemoryRMStateStore();
@@ -577,7 +755,14 @@ public class TestRMRestart {
     rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.KILLED);
 
     // restart rm
-    MockRM rm2 = new MockRM(conf, memStore);
+
+    MockRM rm2 = new MockRM(conf, memStore) {
+      @Override
+      protected RMAppManager createRMAppManager() {
+        return spy(super.createRMAppManager());
+      }
+    };
+
     rm2.start();
 
     GetApplicationsRequest request1 =
@@ -620,6 +805,10 @@ public class TestRMRestart {
         rm2.getClientRMService().getApplications(request2);
     List<ApplicationReport> appList2 = response2.getApplicationList();
     Assert.assertTrue(3 == appList2.size());
+
+    // check application summary is logged for the completed apps after RM restart.
+    verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
+      isA(ApplicationId.class));
   }
 
   private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@@ -720,6 +909,8 @@ public class TestRMRestart {
     Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), 
                         attemptState.getMasterContainer().getId());
 
+    // Setting AMLivelinessMonitor interval to be 10 Secs. 
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
     // start new RM   
     MockRM rm2 = new MockRM(conf, memStore);
     rm2.start();
@@ -818,6 +1009,10 @@ public class TestRMRestart {
     MockRM rm2 = new TestSecurityMockRM(conf, memStore);
     rm2.start();
 
+    // Need to wait for a while as now token renewal happens on another thread
+    // and is asynchronous in nature.
+    waitForTokensToBeRenewed(rm2);
+
     // verify tokens are properly populated back to rm2 DelegationTokenRenewer
     Assert.assertEquals(tokenSet, rm2.getRMContext()
       .getDelegationTokenRenewer().getDelegationTokens());
@@ -827,6 +1022,21 @@ public class TestRMRestart {
     rm2.stop();
   }
 
+  private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
+    int waitCnt = 20;
+    boolean atleastOneAppInNEWState = true;
+    while (waitCnt-- > 0 && atleastOneAppInNEWState) {
+      atleastOneAppInNEWState = false;
+      for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
+        if (rmApp.getState() == RMAppState.NEW) {
+          Thread.sleep(1000);
+          atleastOneAppInNEWState = true;
+          break;
+        }
+      }
+    }
+  }
+
   @Test
   public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
@@ -920,7 +1130,6 @@ public class TestRMRestart {
   @Test
   public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-    
     conf.set(
         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
@@ -1063,6 +1272,43 @@ public class TestRMRestart {
     rm2.stop();
   }
 
+  // This is to test submit an application to the new RM with the old delegation
+  // token got from previous RM.
+  @Test
+  public void testAppSubmissionWithOldDelegationTokenAfterRMRestart()
+      throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
+    UserGroupInformation.setConfiguration(conf);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MockRM rm1 = new TestSecurityMockRM(conf, memStore);
+    rm1.start();
+
+    GetDelegationTokenRequest request1 =
+        GetDelegationTokenRequest.newInstance("renewer1");
+    UserGroupInformation.getCurrentUser().setAuthenticationMethod(
+        AuthMethod.KERBEROS);
+    GetDelegationTokenResponse response1 =
+        rm1.getClientRMService().getDelegationToken(request1);
+    Token<RMDelegationTokenIdentifier> token1 =
+        ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr);
+
+    // start new RM
+    MockRM rm2 = new TestSecurityMockRM(conf, memStore);
+    rm2.start();
+
+    // submit an app with the old delegation token got from previous RM.
+    Credentials ts = new Credentials();
+    ts.addToken(token1.getService(), token1);
+    RMApp app = rm2.submitApp(200, "name", "user",
+        new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
+    rm2.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+  }
+
   @Test
   public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
     MemoryRMStateStore memStore = new MemoryRMStateStore() {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Tue Nov 19 17:26:23 2013
@@ -372,10 +372,10 @@ public class TestRMAppAttemptTransitions
   }
   
   /**
-   * {@link RMAppAttemptState#RECOVERED}
+   * {@link RMAppAttemptState#LAUNCHED}
    */
   private void testAppAttemptRecoveredState() {
-    assertEquals(RMAppAttemptState.RECOVERED, 
+    assertEquals(RMAppAttemptState.LAUNCHED, 
         applicationAttempt.getAppAttemptState());
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Tue Nov 19 17:26:23 2013
@@ -141,6 +141,8 @@ public class TestFifoScheduler {
 
     FifoScheduler schedular = new FifoScheduler();
     schedular.reinitialize(new Configuration(), rmContext);
+    QueueMetrics metrics = schedular.getRootQueueMetrics();
+    int beforeAppsSubmitted = metrics.getAppsSubmitted();
 
     ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
     ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
@@ -155,8 +157,8 @@ public class TestFifoScheduler {
     event = new AppAddedSchedulerEvent(appAttemptId, "queue", "user");
     schedular.handle(event);
 
-    QueueMetrics metrics = schedular.getRootQueueMetrics();
-    Assert.assertEquals(1, metrics.getAppsSubmitted());
+    int afterAppsSubmitted = metrics.getAppsSubmitted();
+    Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted);
   }
 
   @Test(timeout=2000)

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Tue Nov 19 17:26:23 2013
@@ -31,13 +31,24 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -46,16 +57,29 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -66,14 +90,18 @@ import org.mockito.stubbing.Answer;
 
 /**
  * unit test - 
- * tests addition/deletion/cancelation of renewals of delegation tokens
+ * tests addition/deletion/cancellation of renewals of delegation tokens
  *
  */
+@SuppressWarnings("rawtypes")
 public class TestDelegationTokenRenewer {
   private static final Log LOG = 
       LogFactory.getLog(TestDelegationTokenRenewer.class);
   private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
   
+  private static BlockingQueue<Event> eventQueue;
+  private static volatile AtomicInteger counter;
+  private static AsyncDispatcher dispatcher;
   public static class Renewer extends TokenRenewer {
     private static int counter = 0;
     private static Token<?> lastRenewed = null;
@@ -143,11 +171,20 @@ public class TestDelegationTokenRenewer 
 
   @Before
   public void setUp() throws Exception {
+    counter = new AtomicInteger(0);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    eventQueue = new LinkedBlockingQueue<Event>();
+    dispatcher = new AsyncDispatcher(eventQueue);
     Renewer.reset();
-    delegationTokenRenewer = new DelegationTokenRenewer();
+    delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
     delegationTokenRenewer.init(conf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(
+        delegationTokenRenewer);
+    when(mockContext.getDispatcher()).thenReturn(dispatcher);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
@@ -285,7 +322,7 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTRenewal () throws Exception {
     MyFS dfs = (MyFS)FileSystem.get(conf);
     LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -316,8 +353,9 @@ public class TestDelegationTokenRenewer 
     // register the tokens for renewal
     ApplicationId applicationId_0 = 
         BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplication(applicationId_0, ts, true);
-    
+    delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
+    waitForEventsToGetProcessed(delegationTokenRenewer);
+
     // first 3 initial renewals + 1 real
     int numberOfExpectedRenewals = 3+1; 
     
@@ -355,9 +393,10 @@ public class TestDelegationTokenRenewer 
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplication(applicationId_1, ts, true);
+    delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
-    
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
     try {
       Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -377,8 +416,8 @@ public class TestDelegationTokenRenewer 
     }
   }
   
-  @Test
-  public void testInvalidDTWithAddApplication() throws Exception {
+  @Test(timeout=60000)
+  public void testAppRejectionWithCancelledDelegationToken() throws Exception {
     MyFS dfs = (MyFS)FileSystem.get(conf);
     LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
 
@@ -390,12 +429,21 @@ public class TestDelegationTokenRenewer 
     
     // register the tokens for renewal
     ApplicationId appId =  BuilderUtils.newApplicationId(0, 0);
-    try {
-      delegationTokenRenewer.addApplication(appId, ts, true);
-      fail("App submission with a cancelled token should have failed");
-    } catch (InvalidToken e) {
-      // expected
+    delegationTokenRenewer.addApplication(appId, ts, true, false);
+    int waitCnt = 20;
+    while (waitCnt-- >0) {
+      if (!eventQueue.isEmpty()) {
+        Event evt = eventQueue.take();
+        if (evt.getType() == RMAppEventType.APP_REJECTED) {
+          Assert.assertTrue(
+              ((RMAppEvent) evt).getApplicationId().equals(appId));
+          return;
+        }
+      } else {
+        Thread.sleep(500);
+      }
     }
+    fail("App submission with a cancelled token should have failed");
   }
   
   /**
@@ -408,7 +456,7 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTRenewalWithNoCancel () throws Exception {
     MyFS dfs = (MyFS)FileSystem.get(conf);
     LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -425,9 +473,10 @@ public class TestDelegationTokenRenewer 
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplication(applicationId_1, ts, false);
+    delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
-    
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
     try {
       Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -454,9 +503,8 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTKeepAlive1 () throws Exception {
-    DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
     Configuration lconf = new Configuration(conf);
     lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     //Keep tokens alive for 6 seconds.
@@ -465,10 +513,15 @@ public class TestDelegationTokenRenewer 
     lconf.setLong(
         YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
         1000l);
+    DelegationTokenRenewer localDtr =
+        createNewDelegationTokenRenewer(lconf, counter);
     localDtr.init(lconf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(
+        localDtr);
+    when(mockContext.getDispatcher()).thenReturn(dispatcher);
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -487,16 +540,25 @@ public class TestDelegationTokenRenewer 
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplication(applicationId_0, ts, true);
+    localDtr.addApplication(applicationId_0, ts, true, false);
+    waitForEventsToGetProcessed(localDtr);
+    if (!eventQueue.isEmpty()){
+      Event evt = eventQueue.take();
+      if (evt instanceof RMAppEvent) {
+        Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START);
+      } else {
+        fail("RMAppEvent.START was expected!!");
+      }
+    }
+    
     localDtr.applicationFinished(applicationId_0);
- 
-    Thread.sleep(3000l);
+    waitForEventsToGetProcessed(localDtr);
 
     //Token should still be around. Renewal should not fail.
     token1.renew(lconf);
 
     //Allow the keepalive time to run out
-    Thread.sleep(6000l);
+    Thread.sleep(10000l);
 
     //The token should have been cancelled at this point. Renewal will fail.
     try {
@@ -518,9 +580,8 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTKeepAlive2() throws Exception {
-    DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
     Configuration lconf = new Configuration(conf);
     lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     //Keep tokens alive for 6 seconds.
@@ -529,10 +590,15 @@ public class TestDelegationTokenRenewer 
     lconf.setLong(
         YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
         1000l);
+    DelegationTokenRenewer localDtr =
+        createNewDelegationTokenRenewer(conf, counter);
     localDtr.init(lconf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(
+        localDtr);
+    when(mockContext.getDispatcher()).thenReturn(dispatcher);
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -551,22 +617,18 @@ public class TestDelegationTokenRenewer 
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplication(applicationId_0, ts, true);
+    localDtr.addApplication(applicationId_0, ts, true, false);
     localDtr.applicationFinished(applicationId_0);
-
-    Thread.sleep(4000l);
-
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     //Send another keep alive.
     localDtr.updateKeepAliveApplications(Collections
         .singletonList(applicationId_0));
     //Renewal should not fail.
     token1.renew(lconf);
-
     //Token should be around after this. 
     Thread.sleep(4500l);
     //Renewal should not fail. - ~1.5 seconds for keepalive timeout.
     token1.renew(lconf);
-
     //Allow the keepalive time to run out
     Thread.sleep(3000l);
     //The token should have been cancelled at this point. Renewal will fail.
@@ -575,61 +637,127 @@ public class TestDelegationTokenRenewer 
       fail("Renewal of cancelled token should have failed");
     } catch (InvalidToken ite) {}
   }
-  
-  @Test(timeout=20000)
-  public void testConncurrentAddApplication()
-      throws IOException, InterruptedException, BrokenBarrierException {
-    final CyclicBarrier startBarrier = new CyclicBarrier(2);
-    final CyclicBarrier endBarrier = new CyclicBarrier(2);
-
-    // this token uses barriers to block during renew
-    final Credentials creds1 = new Credentials();
-    final Token<?> token1 = mock(Token.class);
-    creds1.addToken(new Text("token"), token1);
-    doReturn(true).when(token1).isManaged();
-    doAnswer(new Answer<Long>() {
-      public Long answer(InvocationOnMock invocation)
-          throws InterruptedException, BrokenBarrierException {
-        startBarrier.await();
-        endBarrier.await();
-        return Long.MAX_VALUE;
-      }}).when(token1).renew(any(Configuration.class));
-
-    // this dummy token fakes renewing
-    final Credentials creds2 = new Credentials();
-    final Token<?> token2 = mock(Token.class);
-    creds2.addToken(new Text("token"), token2);
-    doReturn(true).when(token2).isManaged();
-    doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
-
-    // fire up the renewer
-    final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
-    dtr.init(conf);
-    RMContext mockContext = mock(RMContext.class);
-    ClientRMService mockClientRMService = mock(ClientRMService.class);
-    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
-    InetSocketAddress sockAddr =
-        InetSocketAddress.createUnresolved("localhost", 1234);
-    when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    dtr.setRMContext(mockContext);
-    dtr.start();
-    
-    // submit a job that blocks during renewal
-    Thread submitThread = new Thread() {
+
+  private DelegationTokenRenewer createNewDelegationTokenRenewer(
+      Configuration conf, final AtomicInteger counter) {
+    return new DelegationTokenRenewer() {
+
       @Override
-      public void run() {
-        try {
-          dtr.addApplication(mock(ApplicationId.class), creds1, false);
-        } catch (IOException e) {}        
+      protected ThreadPoolExecutor
+          createNewThreadPoolService(Configuration conf) {
+        ThreadPoolExecutor pool =
+            new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>()) {
+
+              @Override
+              protected void afterExecute(Runnable r, Throwable t) {
+                counter.decrementAndGet();
+                super.afterExecute(r, t);
+              }
+
+              @Override
+              public void execute(Runnable command) {
+                counter.incrementAndGet();
+                super.execute(command);
+              }
+            };
+        return pool;
       }
     };
-    submitThread.start();
-    
+  }
+
+  private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
+      throws InterruptedException {
+    int wait = 40;
+    while (wait-- > 0
+        && counter.get() > 0) {
+      Thread.sleep(200);
+    }
+  }
+  
+  @Test(timeout=20000)                                                         
+  public void testConcurrentAddApplication()                                  
+      throws IOException, InterruptedException, BrokenBarrierException {       
+    final CyclicBarrier startBarrier = new CyclicBarrier(2);                   
+    final CyclicBarrier endBarrier = new CyclicBarrier(2);                     
+                                                                               
+    // this token uses barriers to block during renew                          
+    final Credentials creds1 = new Credentials();                              
+    final Token<?> token1 = mock(Token.class);                                 
+    creds1.addToken(new Text("token"), token1);                                
+    doReturn(true).when(token1).isManaged();                                   
+    doAnswer(new Answer<Long>() {                                              
+      public Long answer(InvocationOnMock invocation)                          
+          throws InterruptedException, BrokenBarrierException { 
+        startBarrier.await();                                                  
+        endBarrier.await();                                                    
+        return Long.MAX_VALUE;                                                 
+      }}).when(token1).renew(any(Configuration.class));                        
+                                                                               
+    // this dummy token fakes renewing                                         
+    final Credentials creds2 = new Credentials();                              
+    final Token<?> token2 = mock(Token.class);                                 
+    creds2.addToken(new Text("token"), token2);                                
+    doReturn(true).when(token2).isManaged();                                   
+    doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));     
+                                                                               
+    // fire up the renewer                                                     
+    final DelegationTokenRenewer dtr =
+        createNewDelegationTokenRenewer(conf, counter);           
+    dtr.init(conf);                                                            
+    RMContext mockContext = mock(RMContext.class);                             
+    ClientRMService mockClientRMService = mock(ClientRMService.class);         
+    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);    
+    InetSocketAddress sockAddr =                                               
+        InetSocketAddress.createUnresolved("localhost", 1234);                 
+    when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);           
+    dtr.setRMContext(mockContext);  
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+    dtr.start();                                                                           
+    // submit a job that blocks during renewal                                 
+    Thread submitThread = new Thread() {                                       
+      @Override                                                                
+      public void run() {
+        dtr.addApplication(mock(ApplicationId.class), creds1, false, false);        
+      }                                                                        
+    };                                                                         
+    submitThread.start();                                                      
+                                                                               
     // wait till 1st submit blocks, then submit another
-    startBarrier.await();
-    dtr.addApplication(mock(ApplicationId.class), creds2, false);
-    // signal 1st to complete
-    endBarrier.await();
-    submitThread.join();
+    startBarrier.await();                           
+    dtr.addApplication(mock(ApplicationId.class), creds2, false, false);              
+    // signal 1st to complete                                                  
+    endBarrier.await();                                                        
+    submitThread.join(); 
+  }
+  
+  @Test(timeout=20000)
+  public void testAppSubmissionWithInvalidDelegationToken() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    MockRM rm = new MockRM(conf);
+    ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes()); 
+    ContainerLaunchContext amContainer =
+        ContainerLaunchContext.newInstance(
+            new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+            new ArrayList<String>(), new HashMap<String, ByteBuffer>(), tokens,
+            new HashMap<ApplicationAccessType, String>());
+    ApplicationSubmissionContext appSubContext =
+        ApplicationSubmissionContext.newInstance(
+            ApplicationId.newInstance(1234121, 0),
+            "BOGUS", "default", Priority.UNDEFINED, amContainer, false,
+            true, 1, Resource.newInstance(1024, 1), "BOGUS");
+    SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(appSubContext);
+    try {
+      rm.getClientRMService().submitApplication(request);
+      fail("Error was excepted.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Bad header found in token storage"));
+    }
   }
 }