You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/19 06:21:45 UTC

svn commit: r1543313 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/...

Author: vinodkv
Date: Tue Nov 19 05:21:45 2013
New Revision: 1543313

URL: http://svn.apache.org/r1543313
Log:
YARN-674. Fixed ResourceManager to renew DelegationTokens on submission asynchronously to work around potential slowness in state-store. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1543312 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Tue Nov 19 05:21:45 2013
@@ -89,6 +89,10 @@ Release 2.3.0 - UNRELEASED
     ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
     vinodkv)
 
+    YARN-674. Fixed ResourceManager to renew DelegationTokens on submission
+    asynchronously to work around potential slowness in state-store. (Omkar Vinit
+    Joshi via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Nov 19 05:21:45 2013
@@ -504,6 +504,11 @@ public class YarnConfiguration extends C
       RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
   public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
       30000l;
+  
+  /** Delegation Token renewer thread count */
+  public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
+      RM_PREFIX + "delegation-token-renewer.thread-count";
+  public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
 
   /** Whether to enable log aggregation */
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Nov 19 05:21:45 2013
@@ -318,7 +318,7 @@ public class ClientRMService extends Abs
     try {
       // call RMAppManager to submit application directly
       rmAppManager.submitApplication(submissionContext,
-          System.currentTimeMillis(), false, user);
+          System.currentTimeMillis(), user, false, null);
 
       LOG.info("Application with id " + applicationId.getId() + 
           " submitted by user " + user);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Nov 19 05:21:45 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -236,35 +237,63 @@ public class RMAppManager implements Eve
       this.applicationACLsManager.removeApplication(removeId);
     }
   }
-
+  
   @SuppressWarnings("unchecked")
   protected void submitApplication(
       ApplicationSubmissionContext submissionContext, long submitTime,
-      boolean isRecovered, String user) throws YarnException {
+      String user, boolean isRecovered, RMState state) throws YarnException {
     ApplicationId applicationId = submissionContext.getApplicationId();
 
-    // Validation of the ApplicationSubmissionContext needs to be completed
-    // here. Only those fields that are dependent on RM's configuration are
-    // checked here as they have to be validated whether they are part of new
-    // submission or just being recovered.
+    RMAppImpl application =
+        createAndPopulateNewRMApp(submissionContext, submitTime, user);
 
-    // Check whether AM resource requirements are within required limits
-    if (!submissionContext.getUnmanagedAM()) {
-      ResourceRequest amReq = BuilderUtils.newResourceRequest(
-          RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-          submissionContext.getResource(), 1);
-      try {
-        SchedulerUtils.validateResourceRequest(amReq,
-            scheduler.getMaximumResourceCapability());
-      } catch (InvalidResourceRequestException e) {
-        LOG.warn("RM app submission failed in validating AM resource request"
-            + " for application " + applicationId, e);
-        throw e;
+    if (isRecovered) {
+      recoverApplication(state, application);
+      RMAppState rmAppState =
+          state.getApplicationState().get(applicationId).getState();
+      if (isApplicationInFinalState(rmAppState)) {
+        // We are synchronously moving the application into final state so that
+        // momentarily client will not see this application in NEW state. Also
+        // for finished applications we will avoid renewing tokens.
+        application
+            .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
+        return;
       }
     }
+    
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials credentials = null;
+      try {
+        credentials = parseCredentials(submissionContext);
+      } catch (Exception e) {
+        LOG.warn(
+            "Unable to parse credentials.", e);
+        // Sending APP_REJECTED is fine, since we assume that the
+        // RMApp is in NEW state and thus we haven't yet informed the
+        // scheduler about the existence of the application
+        assert application.getState() == RMAppState.NEW;
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppRejectedEvent(applicationId, e.getMessage()));
+        throw RPCUtil.getRemoteException(e);
+      }
+      this.rmContext.getDelegationTokenRenewer().addApplication(
+          applicationId, credentials,
+          submissionContext.getCancelTokensWhenComplete(), isRecovered);
+    } else {
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppEvent(applicationId,
+              isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
+    }
+  }
 
+  private RMAppImpl createAndPopulateNewRMApp(
+      ApplicationSubmissionContext submissionContext,
+      long submitTime, String user)
+      throws YarnException {
+    ApplicationId applicationId = submissionContext.getApplicationId();
+    validateResourceRequest(submissionContext);
     // Create RMApp
-    RMApp application =
+    RMAppImpl application =
         new RMAppImpl(applicationId, rmContext, this.conf,
             submissionContext.getApplicationName(), user,
             submissionContext.getQueue(),
@@ -281,35 +310,52 @@ public class RMAppManager implements Eve
       LOG.warn(message);
       throw RPCUtil.getRemoteException(message);
     }
-
     // Inform the ACLs Manager
     this.applicationACLsManager.addApplication(applicationId,
         submissionContext.getAMContainerSpec().getApplicationACLs());
+    return application;
+  }
 
+  private void validateResourceRequest(
+      ApplicationSubmissionContext submissionContext)
+      throws InvalidResourceRequestException {
+    // Validation of the ApplicationSubmissionContext needs to be completed
+    // here. Only those fields that are dependent on RM's configuration are
+    // checked here as they have to be validated whether they are part of new
+    // submission or just being recovered.
+
+    // Check whether AM resource requirements are within required limits
+    if (!submissionContext.getUnmanagedAM()) {
+      ResourceRequest amReq = BuilderUtils.newResourceRequest(
+          RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+          submissionContext.getResource(), 1);
+      try {
+        SchedulerUtils.validateResourceRequest(amReq,
+            scheduler.getMaximumResourceCapability());
+      } catch (InvalidResourceRequestException e) {
+        LOG.warn("RM app submission failed in validating AM resource request"
+            + " for application " + submissionContext.getApplicationId(), e);
+        throw e;
+      }
+    }
+  }
+
+  private void recoverApplication(RMState state, RMAppImpl application)
+      throws YarnException {
     try {
-      // Setup tokens for renewal
-      if (UserGroupInformation.isSecurityEnabled()) {
-        this.rmContext.getDelegationTokenRenewer().addApplication(
-            applicationId,parseCredentials(submissionContext),
-            submissionContext.getCancelTokensWhenComplete()
-            );
-      }
-    } catch (IOException ie) {
-      LOG.warn(
-          "Unable to add the application to the delegation token renewer.",
-          ie);
-      // Sending APP_REJECTED is fine, since we assume that the
-      // RMApp is in NEW state and thus we havne't yet informed the
-      // Scheduler about the existence of the application
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppRejectedEvent(applicationId, ie.getMessage()));
-      throw RPCUtil.getRemoteException(ie);
+      application.recover(state);
+    } catch (Exception e) {
+      LOG.error("Error recovering application", e);
+      throw new YarnException(e);
     }
+  }
 
-    if (!isRecovered) {
-      // All done, start the RMApp
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.START));
+  private boolean isApplicationInFinalState(RMAppState rmAppState) {
+    if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
+        || rmAppState == RMAppState.KILLED) {
+      return true;
+    } else {
+      return false;
     }
   }
   
@@ -335,17 +381,9 @@ public class RMAppManager implements Eve
     LOG.info("Recovering " + appStates.size() + " applications");
     for (ApplicationState appState : appStates.values()) {
       LOG.info("Recovering application " + appState.getAppId());
+      
       submitApplication(appState.getApplicationSubmissionContext(),
-        appState.getSubmitTime(), true, appState.getUser());
-      // re-populate attempt information in application
-      RMAppImpl appImpl =
-          (RMAppImpl) rmContext.getRMApps().get(appState.getAppId());
-      appImpl.recover(state);
-      // Recover the app synchronously, as otherwise client is possible to see
-      // the application not recovered before it is actually recovered because
-      // ClientRMService is already started at this point of time.
-      appImpl.handle(new RMAppEvent(appImpl.getApplicationId(),
-        RMAppEventType.RECOVER));
+        appState.getSubmitTime(), appState.getUser(), true, state);
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Tue Nov 19 05:21:45 2013
@@ -34,6 +34,10 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
@@ -48,10 +52,15 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Service to renew application delegation tokens.
@@ -72,7 +81,8 @@ public class DelegationTokenRenewer exte
   // delegation token canceler thread
   private DelegationTokenCancelThread dtCancelThread =
     new DelegationTokenCancelThread();
-
+  private ThreadPoolExecutor renewerService;
+  
   // managing the list of tokens using Map
   // appId=>List<tokens>
   private Set<DelegationTokenToRenew> delegationTokens = 
@@ -84,9 +94,9 @@ public class DelegationTokenRenewer exte
   private long tokenRemovalDelayMs;
   
   private Thread delayedRemovalThread;
-  private boolean isServiceStarted = false;
-  private List<DelegationTokenToRenew> pendingTokenForRenewal =
-      new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
+  private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock();
+  private volatile boolean isServiceStarted;
+  private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
   
   private boolean tokenKeepAliveEnabled;
   
@@ -102,9 +112,27 @@ public class DelegationTokenRenewer exte
     this.tokenRemovalDelayMs =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    renewerService = createNewThreadPoolService(conf);
+    pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
     super.serviceInit(conf);
   }
 
+  protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
+    int nThreads = conf.getInt(
+        YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
+        YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
+
+    ThreadFactory tf = new ThreadFactoryBuilder()
+        .setNameFormat("DelegationTokenRenewer #%d")
+        .build();
+    ThreadPoolExecutor pool =
+        new ThreadPoolExecutor((5 < nThreads ? 5 : nThreads), nThreads, 3L,
+            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+    pool.setThreadFactory(tf);
+    pool.allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     dtCancelThread.start();
@@ -119,21 +147,36 @@ public class DelegationTokenRenewer exte
     RMDelegationTokenIdentifier.Renewer.setSecretManager(
         rmContext.getRMDelegationTokenSecretManager(),
         rmContext.getClientRMService().getBindAddress());
-    // Delegation token renewal is delayed until ClientRMService starts. As
-    // it is required to short circuit the token renewal calls.
+    serviceStateLock.writeLock().lock();
     isServiceStarted = true;
-    renewIfServiceIsStarted(pendingTokenForRenewal);
-    pendingTokenForRenewal.clear();
+    serviceStateLock.writeLock().unlock();
+    while(!pendingEventQueue.isEmpty()) {
+      processDelegationTokenRewewerEvent(pendingEventQueue.take());
+    }
     super.serviceStart();
   }
 
+  private void processDelegationTokenRewewerEvent(
+      DelegationTokenRenewerEvent evt) {
+    serviceStateLock.readLock().lock();
+    try {
+      if (isServiceStarted) {
+        renewerService.execute(new DelegationTokenRenewerRunnable(evt));
+      } else {
+        pendingEventQueue.add(evt);
+      }
+    } finally {
+      serviceStateLock.readLock().unlock();
+    }
+  }
+
   @Override
   protected void serviceStop() {
     if (renewalTimer != null) {
       renewalTimer.cancel();
     }
     delegationTokens.clear();
-
+    this.renewerService.shutdown();
     dtCancelThread.interrupt();
     try {
       dtCancelThread.join(1000);
@@ -290,47 +333,50 @@ public class DelegationTokenRenewer exte
    * @throws IOException
    */
   public void addApplication(
-      ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
+      ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
+      boolean isApplicationRecovered) {
+    processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
+        applicationId, ts,
+        shouldCancelAtEnd, isApplicationRecovered));
+  }
+
+  private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
       throws IOException {
+    ApplicationId applicationId = evt.getApplicationId();
+    Credentials ts = evt.getCredentials();
+    boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
     if (ts == null) {
-      return; //nothing to add
+      return; // nothing to add
     }
-    
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Registering tokens for renewal for:" + 
+      LOG.debug("Registering tokens for renewal for:" +
           " appId = " + applicationId);
     }
-    
-    Collection <Token<?>> tokens = ts.getAllTokens();
+
+    Collection<Token<?>> tokens = ts.getAllTokens();
     long now = System.currentTimeMillis();
-    
+
     // find tokens for renewal, but don't add timers until we know
     // all renewable tokens are valid
     // At RM restart it is safe to assume that all the previously added tokens
     // are valid
     List<DelegationTokenToRenew> tokenList =
         new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
-    for(Token<?> token : tokens) {
+    for (Token<?> token : tokens) {
       if (token.isManaged()) {
         tokenList.add(new DelegationTokenToRenew(applicationId,
             token, getConfig(), now, shouldCancelAtEnd));
       }
     }
-    if (!tokenList.isEmpty()){
-      renewIfServiceIsStarted(tokenList);
-    }
-  }
-
-  protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
-      throws IOException {
-    if (isServiceStarted) {
+    if (!tokenList.isEmpty()) {
       // Renewing token and adding it to timer calls are separated purposefully
       // If user provides incorrect token then it should not be added for
       // renewal.
-      for (DelegationTokenToRenew dtr : dtrs) {
+      for (DelegationTokenToRenew dtr : tokenList) {
         renewToken(dtr);
       }
-      for (DelegationTokenToRenew dtr : dtrs) {
+      for (DelegationTokenToRenew dtr : tokenList) {
         addTokenToList(dtr);
         setTimerForTokenRenewal(dtr);
         if (LOG.isDebugEnabled()) {
@@ -338,11 +384,9 @@ public class DelegationTokenRenewer exte
               + dtr.token.getService() + " for appId = " + dtr.applicationId);
         }
       }
-    } else {
-      pendingTokenForRenewal.addAll(dtrs);
     }
   }
-  
+
   /**
    * Task - to renew a token
    *
@@ -449,14 +493,20 @@ public class DelegationTokenRenewer exte
    * @param applicationId completed application
    */
   public void applicationFinished(ApplicationId applicationId) {
+    processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
+        applicationId,
+        DelegationTokenRenewerEventType.FINISH_APPLICATION));
+  }
+
+  private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) {
     if (!tokenKeepAliveEnabled) {
-      removeApplicationFromRenewal(applicationId);
+      removeApplicationFromRenewal(evt.getApplicationId());
     } else {
-      delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+      delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis()
           + tokenRemovalDelayMs);
     }
   }
-
+  
   /**
    * Add a list of applications to the keep alive list. If an appId already
    * exists, update it's keep-alive time.
@@ -546,4 +596,111 @@ public class DelegationTokenRenewer exte
   public void setRMContext(RMContext rmContext) {
     this.rmContext = rmContext;
   }
+  
+  /*
+   * This will run as a separate thread and will process individual events. It
+   * is done in this way to make sure that the token renewal as a part of
+   * application submission and token removal as a part of application finish
+   * is asynchronous in nature.
+   */
+  private final class DelegationTokenRenewerRunnable
+      implements Runnable {
+
+    private DelegationTokenRenewerEvent evt;
+    
+    public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) {
+      this.evt = evt;
+    }
+    
+    @Override
+    public void run() {
+      if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
+        DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
+            (DelegationTokenRenewerAppSubmitEvent) evt;
+        handleDTRenewerAppSubmitEvent(appSubmitEvt);
+      } else if (evt.getType().equals(
+          DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
+        DelegationTokenRenewer.this.handleAppFinishEvent(evt);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void handleDTRenewerAppSubmitEvent(
+        DelegationTokenRenewerAppSubmitEvent event) {
+      /*
+       * For applications submitted with delegation tokens we are not submitting
+       * the application to scheduler from RMAppManager. Instead we are doing
+       * it from here. The primary goal is to make token renewal as a part of
+       * application submission asynchronous so that client thread is not
+       * blocked during app submission.
+       */
+      try {
+        // Setup tokens for renewal
+        DelegationTokenRenewer.this.handleAppSubmitEvent(event);
+        rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppEvent(event.getApplicationId(),
+                event.isApplicationRecovered() ? RMAppEventType.RECOVER
+                    : RMAppEventType.START));
+      } catch (Throwable t) {
+        LOG.warn(
+            "Unable to add the application to the delegation token renewer.",
+            t);
+        // Sending APP_REJECTED is fine, since we assume that the
+        // RMApp is in NEW state and thus we havne't yet informed the
+        // Scheduler about the existence of the application
+        rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));
+      }
+    }
+  }
+  
+  class DelegationTokenRenewerAppSubmitEvent extends
+      DelegationTokenRenewerEvent {
+
+    private Credentials credentials;
+    private boolean shouldCancelAtEnd;
+    private boolean isAppRecovered;
+
+    public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+        Credentials credentails, boolean shouldCancelAtEnd,
+        boolean isApplicationRecovered) {
+      super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+      this.credentials = credentails;
+      this.shouldCancelAtEnd = shouldCancelAtEnd;
+      this.isAppRecovered = isApplicationRecovered;
+    }
+
+    public Credentials getCredentials() {
+      return credentials;
+    }
+
+    public boolean shouldCancelAtEnd() {
+      return shouldCancelAtEnd;
+    }
+
+    public boolean isApplicationRecovered() {
+      return isAppRecovered;
+    }
+  }
+  
+  enum DelegationTokenRenewerEventType {
+    VERIFY_AND_START_APPLICATION,
+    FINISH_APPLICATION
+  }
+  
+  class DelegationTokenRenewerEvent extends
+      AbstractEvent<DelegationTokenRenewerEventType> {
+
+    private ApplicationId appId;
+
+    public DelegationTokenRenewerEvent(ApplicationId appId,
+        DelegationTokenRenewerEventType type) {
+      super(type);
+      this.appId = appId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Tue Nov 19 05:21:45 2013
@@ -172,7 +172,7 @@ public class TestAppManager{
         ApplicationSubmissionContext submissionContext, String user)
             throws YarnException {
       super.submitApplication(submissionContext, System.currentTimeMillis(),
-          false, user);
+          user, false, null);
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Nov 19 05:21:45 2013
@@ -1009,6 +1009,10 @@ public class TestRMRestart {
     MockRM rm2 = new TestSecurityMockRM(conf, memStore);
     rm2.start();
 
+    // Need to wait for a while as now token renewal happens on another thread
+    // and is asynchronous in nature.
+    waitForTokensToBeRenewed(rm2);
+
     // verify tokens are properly populated back to rm2 DelegationTokenRenewer
     Assert.assertEquals(tokenSet, rm2.getRMContext()
       .getDelegationTokenRenewer().getDelegationTokens());
@@ -1018,6 +1022,21 @@ public class TestRMRestart {
     rm2.stop();
   }
 
+  private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
+    int waitCnt = 20;
+    boolean atleastOneAppInNEWState = true;
+    while (waitCnt-- > 0 && atleastOneAppInNEWState) {
+      atleastOneAppInNEWState = false;
+      for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) {
+        if (rmApp.getState() == RMAppState.NEW) {
+          Thread.sleep(1000);
+          atleastOneAppInNEWState = true;
+          break;
+        }
+      }
+    }
+  }
+
   @Test
   public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1543313&r1=1543312&r2=1543313&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Tue Nov 19 05:21:45 2013
@@ -31,13 +31,24 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -46,16 +57,29 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -66,14 +90,18 @@ import org.mockito.stubbing.Answer;
 
 /**
  * unit test - 
- * tests addition/deletion/cancelation of renewals of delegation tokens
+ * tests addition/deletion/cancellation of renewals of delegation tokens
  *
  */
+@SuppressWarnings("rawtypes")
 public class TestDelegationTokenRenewer {
   private static final Log LOG = 
       LogFactory.getLog(TestDelegationTokenRenewer.class);
   private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
   
+  private static BlockingQueue<Event> eventQueue;
+  private static volatile AtomicInteger counter;
+  private static AsyncDispatcher dispatcher;
   public static class Renewer extends TokenRenewer {
     private static int counter = 0;
     private static Token<?> lastRenewed = null;
@@ -143,11 +171,20 @@ public class TestDelegationTokenRenewer 
 
   @Before
   public void setUp() throws Exception {
+    counter = new AtomicInteger(0);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    eventQueue = new LinkedBlockingQueue<Event>();
+    dispatcher = new AsyncDispatcher(eventQueue);
     Renewer.reset();
-    delegationTokenRenewer = new DelegationTokenRenewer();
+    delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
     delegationTokenRenewer.init(conf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(
+        delegationTokenRenewer);
+    when(mockContext.getDispatcher()).thenReturn(dispatcher);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
@@ -285,7 +322,7 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTRenewal () throws Exception {
     MyFS dfs = (MyFS)FileSystem.get(conf);
     LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -316,8 +353,9 @@ public class TestDelegationTokenRenewer 
     // register the tokens for renewal
     ApplicationId applicationId_0 = 
         BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplication(applicationId_0, ts, true);
-    
+    delegationTokenRenewer.addApplication(applicationId_0, ts, true, false);
+    waitForEventsToGetProcessed(delegationTokenRenewer);
+
     // first 3 initial renewals + 1 real
     int numberOfExpectedRenewals = 3+1; 
     
@@ -355,9 +393,10 @@ public class TestDelegationTokenRenewer 
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplication(applicationId_1, ts, true);
+    delegationTokenRenewer.addApplication(applicationId_1, ts, true, false);
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
-    
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
     try {
       Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -377,8 +416,8 @@ public class TestDelegationTokenRenewer 
     }
   }
   
-  @Test
-  public void testInvalidDTWithAddApplication() throws Exception {
+  @Test(timeout=60000)
+  public void testAppRejectionWithCancelledDelegationToken() throws Exception {
     MyFS dfs = (MyFS)FileSystem.get(conf);
     LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
 
@@ -390,12 +429,21 @@ public class TestDelegationTokenRenewer 
     
     // register the tokens for renewal
     ApplicationId appId =  BuilderUtils.newApplicationId(0, 0);
-    try {
-      delegationTokenRenewer.addApplication(appId, ts, true);
-      fail("App submission with a cancelled token should have failed");
-    } catch (InvalidToken e) {
-      // expected
+    delegationTokenRenewer.addApplication(appId, ts, true, false);
+    int waitCnt = 20;
+    while (waitCnt-- >0) {
+      if (!eventQueue.isEmpty()) {
+        Event evt = eventQueue.take();
+        if (evt.getType() == RMAppEventType.APP_REJECTED) {
+          Assert.assertTrue(
+              ((RMAppEvent) evt).getApplicationId().equals(appId));
+          return;
+        }
+      } else {
+        Thread.sleep(500);
+      }
     }
+    fail("App submission with a cancelled token should have failed");
   }
   
   /**
@@ -408,7 +456,7 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTRenewalWithNoCancel () throws Exception {
     MyFS dfs = (MyFS)FileSystem.get(conf);
     LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
@@ -425,9 +473,10 @@ public class TestDelegationTokenRenewer 
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplication(applicationId_1, ts, false);
+    delegationTokenRenewer.addApplication(applicationId_1, ts, false, false);
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
-    
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
     try {
       Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
@@ -454,9 +503,8 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTKeepAlive1 () throws Exception {
-    DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
     Configuration lconf = new Configuration(conf);
     lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     //Keep tokens alive for 6 seconds.
@@ -465,10 +513,15 @@ public class TestDelegationTokenRenewer 
     lconf.setLong(
         YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
         1000l);
+    DelegationTokenRenewer localDtr =
+        createNewDelegationTokenRenewer(lconf, counter);
     localDtr.init(lconf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(
+        localDtr);
+    when(mockContext.getDispatcher()).thenReturn(dispatcher);
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -487,16 +540,25 @@ public class TestDelegationTokenRenewer 
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplication(applicationId_0, ts, true);
+    localDtr.addApplication(applicationId_0, ts, true, false);
+    waitForEventsToGetProcessed(localDtr);
+    if (!eventQueue.isEmpty()){
+      Event evt = eventQueue.take();
+      if (evt instanceof RMAppEvent) {
+        Assert.assertEquals(((RMAppEvent)evt).getType(), RMAppEventType.START);
+      } else {
+        fail("RMAppEvent.START was expected!!");
+      }
+    }
+    
     localDtr.applicationFinished(applicationId_0);
- 
-    Thread.sleep(3000l);
+    waitForEventsToGetProcessed(localDtr);
 
     //Token should still be around. Renewal should not fail.
     token1.renew(lconf);
 
     //Allow the keepalive time to run out
-    Thread.sleep(6000l);
+    Thread.sleep(10000l);
 
     //The token should have been cancelled at this point. Renewal will fail.
     try {
@@ -518,9 +580,8 @@ public class TestDelegationTokenRenewer 
    * @throws IOException
    * @throws URISyntaxException
    */
-  @Test
+  @Test(timeout=60000)
   public void testDTKeepAlive2() throws Exception {
-    DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
     Configuration lconf = new Configuration(conf);
     lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     //Keep tokens alive for 6 seconds.
@@ -529,10 +590,15 @@ public class TestDelegationTokenRenewer 
     lconf.setLong(
         YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
         1000l);
+    DelegationTokenRenewer localDtr =
+        createNewDelegationTokenRenewer(conf, counter);
     localDtr.init(lconf);
     RMContext mockContext = mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(
+        localDtr);
+    when(mockContext.getDispatcher()).thenReturn(dispatcher);
     InetSocketAddress sockAddr =
         InetSocketAddress.createUnresolved("localhost", 1234);
     when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
@@ -551,22 +617,18 @@ public class TestDelegationTokenRenewer 
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplication(applicationId_0, ts, true);
+    localDtr.addApplication(applicationId_0, ts, true, false);
     localDtr.applicationFinished(applicationId_0);
-
-    Thread.sleep(4000l);
-
+    waitForEventsToGetProcessed(delegationTokenRenewer);
     //Send another keep alive.
     localDtr.updateKeepAliveApplications(Collections
         .singletonList(applicationId_0));
     //Renewal should not fail.
     token1.renew(lconf);
-
     //Token should be around after this. 
     Thread.sleep(4500l);
     //Renewal should not fail. - ~1.5 seconds for keepalive timeout.
     token1.renew(lconf);
-
     //Allow the keepalive time to run out
     Thread.sleep(3000l);
     //The token should have been cancelled at this point. Renewal will fail.
@@ -575,61 +637,127 @@ public class TestDelegationTokenRenewer 
       fail("Renewal of cancelled token should have failed");
     } catch (InvalidToken ite) {}
   }
-  
-  @Test(timeout=20000)
-  public void testConncurrentAddApplication()
-      throws IOException, InterruptedException, BrokenBarrierException {
-    final CyclicBarrier startBarrier = new CyclicBarrier(2);
-    final CyclicBarrier endBarrier = new CyclicBarrier(2);
-
-    // this token uses barriers to block during renew
-    final Credentials creds1 = new Credentials();
-    final Token<?> token1 = mock(Token.class);
-    creds1.addToken(new Text("token"), token1);
-    doReturn(true).when(token1).isManaged();
-    doAnswer(new Answer<Long>() {
-      public Long answer(InvocationOnMock invocation)
-          throws InterruptedException, BrokenBarrierException {
-        startBarrier.await();
-        endBarrier.await();
-        return Long.MAX_VALUE;
-      }}).when(token1).renew(any(Configuration.class));
-
-    // this dummy token fakes renewing
-    final Credentials creds2 = new Credentials();
-    final Token<?> token2 = mock(Token.class);
-    creds2.addToken(new Text("token"), token2);
-    doReturn(true).when(token2).isManaged();
-    doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));
-
-    // fire up the renewer
-    final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
-    dtr.init(conf);
-    RMContext mockContext = mock(RMContext.class);
-    ClientRMService mockClientRMService = mock(ClientRMService.class);
-    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
-    InetSocketAddress sockAddr =
-        InetSocketAddress.createUnresolved("localhost", 1234);
-    when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
-    dtr.setRMContext(mockContext);
-    dtr.start();
-    
-    // submit a job that blocks during renewal
-    Thread submitThread = new Thread() {
+
+  private DelegationTokenRenewer createNewDelegationTokenRenewer(
+      Configuration conf, final AtomicInteger counter) {
+    return new DelegationTokenRenewer() {
+
       @Override
-      public void run() {
-        try {
-          dtr.addApplication(mock(ApplicationId.class), creds1, false);
-        } catch (IOException e) {}        
+      protected ThreadPoolExecutor
+          createNewThreadPoolService(Configuration conf) {
+        ThreadPoolExecutor pool =
+            new ThreadPoolExecutor(5, 5, 3L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>()) {
+
+              @Override
+              protected void afterExecute(Runnable r, Throwable t) {
+                counter.decrementAndGet();
+                super.afterExecute(r, t);
+              }
+
+              @Override
+              public void execute(Runnable command) {
+                counter.incrementAndGet();
+                super.execute(command);
+              }
+            };
+        return pool;
       }
     };
-    submitThread.start();
-    
+  }
+
+  private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
+      throws InterruptedException {
+    int wait = 40;
+    while (wait-- > 0
+        && counter.get() > 0) {
+      Thread.sleep(200);
+    }
+  }
+  
+  @Test(timeout=20000)                                                         
+  public void testConcurrentAddApplication()                                  
+      throws IOException, InterruptedException, BrokenBarrierException {       
+    final CyclicBarrier startBarrier = new CyclicBarrier(2);                   
+    final CyclicBarrier endBarrier = new CyclicBarrier(2);                     
+                                                                               
+    // this token uses barriers to block during renew                          
+    final Credentials creds1 = new Credentials();                              
+    final Token<?> token1 = mock(Token.class);                                 
+    creds1.addToken(new Text("token"), token1);                                
+    doReturn(true).when(token1).isManaged();                                   
+    doAnswer(new Answer<Long>() {                                              
+      public Long answer(InvocationOnMock invocation)                          
+          throws InterruptedException, BrokenBarrierException { 
+        startBarrier.await();                                                  
+        endBarrier.await();                                                    
+        return Long.MAX_VALUE;                                                 
+      }}).when(token1).renew(any(Configuration.class));                        
+                                                                               
+    // this dummy token fakes renewing                                         
+    final Credentials creds2 = new Credentials();                              
+    final Token<?> token2 = mock(Token.class);                                 
+    creds2.addToken(new Text("token"), token2);                                
+    doReturn(true).when(token2).isManaged();                                   
+    doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));     
+                                                                               
+    // fire up the renewer                                                     
+    final DelegationTokenRenewer dtr =
+        createNewDelegationTokenRenewer(conf, counter);           
+    dtr.init(conf);                                                            
+    RMContext mockContext = mock(RMContext.class);                             
+    ClientRMService mockClientRMService = mock(ClientRMService.class);         
+    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);    
+    InetSocketAddress sockAddr =                                               
+        InetSocketAddress.createUnresolved("localhost", 1234);                 
+    when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);           
+    dtr.setRMContext(mockContext);  
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+    dtr.start();                                                                           
+    // submit a job that blocks during renewal                                 
+    Thread submitThread = new Thread() {                                       
+      @Override                                                                
+      public void run() {
+        dtr.addApplication(mock(ApplicationId.class), creds1, false, false);        
+      }                                                                        
+    };                                                                         
+    submitThread.start();                                                      
+                                                                               
     // wait till 1st submit blocks, then submit another
-    startBarrier.await();
-    dtr.addApplication(mock(ApplicationId.class), creds2, false);
-    // signal 1st to complete
-    endBarrier.await();
-    submitThread.join();
+    startBarrier.await();                           
+    dtr.addApplication(mock(ApplicationId.class), creds2, false, false);              
+    // signal 1st to complete                                                  
+    endBarrier.await();                                                        
+    submitThread.join(); 
+  }
+  
+  @Test(timeout=20000)
+  public void testAppSubmissionWithInvalidDelegationToken() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    MockRM rm = new MockRM(conf);
+    ByteBuffer tokens = ByteBuffer.wrap("BOGUS".getBytes()); 
+    ContainerLaunchContext amContainer =
+        ContainerLaunchContext.newInstance(
+            new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+            new ArrayList<String>(), new HashMap<String, ByteBuffer>(), tokens,
+            new HashMap<ApplicationAccessType, String>());
+    ApplicationSubmissionContext appSubContext =
+        ApplicationSubmissionContext.newInstance(
+            ApplicationId.newInstance(1234121, 0),
+            "BOGUS", "default", Priority.UNDEFINED, amContainer, false,
+            true, 1, Resource.newInstance(1024, 1), "BOGUS");
+    SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(appSubContext);
+    try {
+      rm.getClientRMService().submitApplication(request);
+      fail("Error was excepted.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Bad header found in token storage"));
+    }
   }
 }