You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/11/05 02:45:34 UTC

git commit: YARN-2010. Handle app-recovery failures gracefully. (Jian He and Karthik Kambatla via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk d78191a71 -> b2cd26980


YARN-2010. Handle app-recovery failures gracefully. (Jian He and Karthik Kambatla via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2cd2698
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2cd2698
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2cd2698

Branch: refs/heads/trunk
Commit: b2cd2698028118b6384904732dbf94942f644732
Parents: d78191a
Author: Karthik Kambatla <ka...@apache.org>
Authored: Tue Nov 4 17:44:59 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue Nov 4 17:45:24 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../server/resourcemanager/RMAppManager.java    | 55 +++--------------
 .../server/resourcemanager/rmapp/RMAppImpl.java | 47 ++++++++++++++
 .../rmapp/RMAppRecoverEvent.java                | 36 +++++++++++
 .../rmapp/attempt/RMAppAttemptImpl.java         |  6 +-
 .../scheduler/QueueNotFoundException.java       | 32 ++++++++++
 .../scheduler/capacity/CapacityScheduler.java   |  7 +--
 .../TestWorkPreservingRMRestart.java            | 18 ++----
 .../rmapp/TestRMAppTransitions.java             | 65 +++++++++++++++++---
 9 files changed, 196 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 118cdc4..f9fcf5b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -856,6 +856,9 @@ Release 2.6.0 - UNRELEASED
     of races between the launch and the stop-container call and when root
     processes crash. (Billie Rinaldi via vinodkv)
 
+    YARN-2010. Handle app-recovery failures gracefully. 
+    (Jian He and Karthik Kambatla via kasha)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 63333b8..02c6d2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -274,12 +275,11 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     ApplicationId appId = submissionContext.getApplicationId();
 
     if (UserGroupInformation.isSecurityEnabled()) {
-      Credentials credentials = null;
       try {
-        credentials = parseCredentials(submissionContext);
         this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
-          credentials, submissionContext.getCancelTokensWhenComplete(),
-          application.getUser());
+            parseCredentials(submissionContext),
+            submissionContext.getCancelTokensWhenComplete(),
+            application.getUser());
       } catch (Exception e) {
         LOG.warn("Unable to parse credentials.", e);
         // Sending APP_REJECTED is fine, since we assume that the
@@ -299,10 +299,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     }
   }
 
-  @SuppressWarnings("unchecked")
-  protected void
-      recoverApplication(ApplicationState appState, RMState rmState)
-          throws Exception {
+  protected void recoverApplication(ApplicationState appState, RMState rmState)
+      throws Exception {
     ApplicationSubmissionContext appContext =
         appState.getApplicationSubmissionContext();
     ApplicationId appId = appState.getAppId();
@@ -311,33 +309,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     RMAppImpl application =
         createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
           appState.getUser());
-    application.recover(rmState);
-    if (isApplicationInFinalState(appState.getState())) {
-      // We are synchronously moving the application into final state so that
-      // momentarily client will not see this application in NEW state. Also
-      // for finished applications we will avoid renewing tokens.
-      application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
-      return;
-    }
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Credentials credentials = null;
-      try {
-        credentials = parseCredentials(appContext);
-        // synchronously renew delegation token on recovery.
-        rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
-          credentials, appContext.getCancelTokensWhenComplete(),
-          application.getUser());
-        application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
-      } catch (Exception e) {
-        LOG.warn("Unable to parse and renew delegation tokens.", e);
-        this.rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(appId, e.getMessage()));
-        throw e;
-      }
-    } else {
-      application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
-    }
+    application.handle(new RMAppRecoverEvent(appId, rmState));
   }
 
   private RMAppImpl createAndPopulateNewRMApp(
@@ -416,18 +388,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     
     return null;
   }
-
-  private boolean isApplicationInFinalState(RMAppState rmAppState) {
-    if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
-        || rmAppState == RMAppState.KILLED) {
-      return true;
-    } else {
-      return false;
-    }
-  }
   
-  protected Credentials parseCredentials(ApplicationSubmissionContext application)
-      throws IOException {
+  protected Credentials parseCredentials(
+      ApplicationSubmissionContext application) throws IOException {
     Credentials credentials = new Credentials();
     DataInputByteBuffer dibb = new DataInputByteBuffer();
     ByteBuffer tokens = application.getAMContainerSpec().getTokens();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 1994b36..9b10872 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -36,6 +38,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -825,6 +829,15 @@ public class RMAppImpl implements RMApp, Recoverable {
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
+      RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
+      try {
+        app.recover(recoverEvent.getRMState());
+      } catch (Exception e) {
+        String msg = app.applicationId + " failed to recover. " + e.getMessage();
+        failToRecoverApp(app, event, msg, e);
+        return RMAppState.FINAL_SAVING;
+      }
+
       // The app has completed.
       if (app.recoveredFinalState != null) {
         app.recoverAppAttempts();
@@ -832,6 +845,20 @@ public class RMAppImpl implements RMApp, Recoverable {
         return app.recoveredFinalState;
       }
 
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // synchronously renew delegation token on recovery.
+        try {
+          app.rmContext.getDelegationTokenRenewer().addApplicationSync(
+            app.getApplicationId(), app.parseCredentials(),
+            app.submissionContext.getCancelTokensWhenComplete(), app.getUser());
+        } catch (Exception e) {
+          String msg = "Failed to renew delegation token on recovery for "
+              + app.applicationId + e.getMessage();
+          failToRecoverApp(app, event, msg, e);
+          return RMAppState.FINAL_SAVING;
+        }
+      }
+
       // No existent attempts means the attempt associated with this app was not
       // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
@@ -865,6 +892,14 @@ public class RMAppImpl implements RMApp, Recoverable {
       // Thus we return ACCECPTED state on recovery.
       return RMAppState.ACCEPTED;
     }
+
+    private void failToRecoverApp(RMAppImpl app, RMAppEvent event, String msg,
+        Exception e) {
+      app.diagnostics.append(msg);
+      LOG.error(msg, e);
+      app.rememberTargetTransitionsAndStoreState(event, new FinalTransition(
+        RMAppState.FAILED), RMAppState.FAILED, RMAppState.FAILED);
+    }
   }
 
   private static final class AddApplicationToSchedulerTransition extends
@@ -1296,4 +1331,16 @@ public class RMAppImpl implements RMApp, Recoverable {
   public ReservationId getReservationId() {
     return submissionContext.getReservationID();
   }
+
+  protected Credentials parseCredentials() throws IOException {
+    Credentials credentials = new Credentials();
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens();
+    if (tokens != null) {
+      dibb.reset(tokens);
+      credentials.readTokenStorageStream(dibb);
+      tokens.rewind();
+    }
+    return credentials;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java
new file mode 100644
index 0000000..b8c91a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRecoverEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+
+public class RMAppRecoverEvent extends RMAppEvent {
+
+  private final RMState state;
+
+  public RMAppRecoverEvent(ApplicationId appId, RMState state) {
+    super(appId, RMAppEventType.RECOVER);
+    this.state = state;
+  }
+
+  public RMState getRMState() {
+    return state;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index b5a6237..ae11b07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -833,8 +833,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     if (UserGroupInformation.isSecurityEnabled()) {
       byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey(
           RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME);
-      clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
-          .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
+      if (clientTokenMasterKeyBytes != null) {
+        clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager()
+            .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
+      }
     }
 
     this.amrmToken =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java
new file mode 100644
index 0000000..35a1d66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueNotFoundException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+@Private
+public class QueueNotFoundException extends YarnRuntimeException {
+
+  private static final long serialVersionUID = 187239430L;
+
+  public QueueNotFoundException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 9332228..c383e43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
@@ -676,15 +677,13 @@ public class CapacityScheduler extends
       //During a restart, this indicates a queue was removed, which is
       //not presently supported
       if (isAppRecovering) {
-        //throwing RuntimeException because some other exceptions are caught
-        //(including YarnRuntimeException) and we want this to force an exit
-        String queueErrorMsg = "Queue named " + queueName 
+        String queueErrorMsg = "Queue named " + queueName
            + " missing during application recovery."
            + " Queue removal during recovery is not presently supported by the"
            + " capacity scheduler, please restart with all queues configured"
            + " which were present before shutdown/restart.";
         LOG.fatal(queueErrorMsg);
-        throw new RuntimeException(queueErrorMsg);
+        throw new QueueNotFoundException(queueErrorMsg);
       }
       String message = "Application " + applicationId + 
       " submitted by user " + user + " to unknown queue: " + queueName;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 85d3895..536dbd7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -37,6 +37,7 @@ import java.util.Set;
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -570,10 +572,10 @@ public class TestWorkPreservingRMRestart {
   //   submission
   //2. Remove one of the queues, restart the RM
   //3. Verify that the expected exception was thrown
-  @Test (timeout = 30000)
+  @Test (timeout = 30000, expected = QueueNotFoundException.class)
   public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
     if (!schedulerClass.equals(CapacityScheduler.class)) {
-      return;
+      throw new QueueNotFoundException("Dummy");
     }
     conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
     conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
@@ -614,17 +616,7 @@ public class TestWorkPreservingRMRestart {
         new CapacitySchedulerConfiguration(conf);
     setupQueueConfigurationOnlyA(csConf);
     rm2 = new MockRM(csConf, memStore);
-    boolean runtimeThrown = false;
-    try {
-      rm2.start();
-    } catch (RuntimeException e) {
-      //we're catching it because we want to verify the message
-      //and we don't want to set it as an expected exception for the 
-      //test because we only want it to happen here
-      assertTrue(e.getMessage().contains(B + " missing"));
-      runtimeThrown = true;
-    }
-    assertTrue(runtimeThrown);
+    rm2.start();
   }
 
   private void checkParentQueue(ParentQueue parentQueue, int numContainers,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2cd2698/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 6a66385..ecb6b5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
@@ -35,6 +36,8 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
@@ -73,9 +77,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -199,10 +205,11 @@ public class TestRMAppTransitions {
     AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
     store = mock(RMStateStore.class);
     writer = mock(RMApplicationHistoryWriter.class);
+    DelegationTokenRenewer renewer = mock(DelegationTokenRenewer.class);
     RMContext realRMContext = 
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
-          null, new AMRMTokenSecretManager(conf, this.rmContext),
+          renewer, new AMRMTokenSecretManager(conf, this.rmContext),
           new RMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInRM(conf),
           new ClientToAMTokenSecretManagerInRM(),
@@ -387,8 +394,12 @@ public class TestRMAppTransitions {
       ApplicationSubmissionContext submissionContext) throws IOException {
     RMApp application = createNewTestApp(submissionContext);
     // NEW => SUBMITTED event RMAppEventType.RECOVER
+    RMState state = new RMState();
+    ApplicationState appState = new ApplicationState(123, 123, null, "user");
+    state.getApplicationState().put(application.getApplicationId(), appState);
     RMAppEvent event =
-        new RMAppEvent(application.getApplicationId(), RMAppEventType.RECOVER);
+        new RMAppRecoverEvent(application.getApplicationId(), state);
+
     application.handle(event);
     assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
@@ -514,7 +525,46 @@ public class TestRMAppTransitions {
   @Test (timeout = 30000)
   public void testAppRecoverPath() throws IOException {
     LOG.info("--- START: testAppRecoverPath ---");
-    testCreateAppSubmittedRecovery(null);
+    ApplicationSubmissionContext sub =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    ContainerLaunchContext clc =
+        Records.newRecord(ContainerLaunchContext.class);
+    Credentials credentials = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    clc.setTokens(securityTokens);
+    sub.setAMContainerSpec(clc);
+    testCreateAppSubmittedRecovery(sub);
+  }
+
+  @Test (timeout = 30000)
+  public void testAppRecoverToFailed() throws IOException {
+    LOG.info("--- START: testAppRecoverToFailed ---");
+    ApplicationSubmissionContext sub =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    ContainerLaunchContext clc =
+        Records.newRecord(ContainerLaunchContext.class);
+    Credentials credentials = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    clc.setTokens(securityTokens);
+    sub.setAMContainerSpec(clc);
+
+    RMApp application = createNewTestApp(sub);
+    // NEW => FINAL_SAVING, event RMAppEventType.RECOVER
+    RMState state = new RMState();
+    RMAppEvent event =
+        new RMAppRecoverEvent(application.getApplicationId(), state);
+    // NPE will throw on recovery.
+    application.handle(event);
+    assertAppState(RMAppState.FINAL_SAVING, application);
+    sendAppUpdateSavedEvent(application);
+    rmDispatcher.await();
+    assertAppState(RMAppState.FAILED, application);
   }
 
   @Test (timeout = 30000)
@@ -917,7 +967,6 @@ public class TestRMAppTransitions {
     }
   }
   
-  @SuppressWarnings("deprecation")
   public void testRecoverApplication(ApplicationState appState, RMState rmState)
       throws Exception {
     ApplicationSubmissionContext submissionContext =
@@ -932,15 +981,15 @@ public class TestRMAppTransitions {
                 RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
                 submissionContext.getResource(), 1));
     Assert.assertEquals(RMAppState.NEW, application.getState());
-    application.recover(rmState);
 
+    RMAppEvent recoverEvent =
+        new RMAppRecoverEvent(application.getApplicationId(), rmState);
+    // Trigger RECOVER event.
+    application.handle(recoverEvent);
     // Application final status looked from recoveredFinalStatus
     Assert.assertTrue("Application is not in recoveredFinalStatus.",
         RMAppImpl.isAppInFinalState(application));
 
-    // Trigger RECOVER event.
-    application.handle(new RMAppEvent(appState.getAppId(),
-        RMAppEventType.RECOVER));
     rmDispatcher.await();
     RMAppState finalState = appState.getState();
     Assert.assertEquals("Application is not in finalState.", finalState,