You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2012/12/19 05:21:24 UTC

svn commit: r1423758 [2/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ hadoop-yarn...

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Wed Dec 19 04:21:18 2012
@@ -44,6 +44,12 @@ public interface RMApp extends EventHand
    * @return the {@link ApplicationId} for this {@link RMApp}.
    */
   ApplicationId getApplicationId();
+  
+  /**
+   * The application submission context for this {@link RMApp}
+   * @return the {@link ApplicationSubmissionContext} for this {@link RMApp}
+   */
+  ApplicationSubmissionContext getApplicationSubmissionContext();
 
   /**
    * The current state of the {@link RMApp}.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Dec 19 04:21:18 2012
@@ -50,6 +50,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -66,7 +69,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
-public class RMAppImpl implements RMApp {
+public class RMAppImpl implements RMApp, Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
   private static final String UNAVAILABLE = "N/A";
@@ -243,6 +246,11 @@ public class RMAppImpl implements RMApp 
   public ApplicationId getApplicationId() {
     return this.applicationId;
   }
+  
+  @Override
+  public ApplicationSubmissionContext getApplicationSubmissionContext() {
+    return this.submissionContext;
+  }
 
   @Override
   public FinalApplicationStatus getFinalApplicationStatus() {
@@ -512,9 +520,22 @@ public class RMAppImpl implements RMApp 
       this.writeLock.unlock();
     }
   }
+  
+  @Override
+  public void recover(RMState state) {
+    ApplicationState appState = state.getApplicationState().get(getApplicationId());
+    LOG.info("Recovering app: " + getApplicationId() + " with " + 
+            + appState.getAttemptCount() + " attempts");
+    for(int i=0; i<appState.getAttemptCount(); ++i) {
+      // create attempt
+      createNewAttempt(false);
+      // recover attempt
+      ((RMAppAttemptImpl) currentAttempt).recover(state);
+    }
+  }
 
   @SuppressWarnings("unchecked")
-  private void createNewAttempt() {
+  private void createNewAttempt(boolean startAttempt) {
     ApplicationAttemptId appAttemptId = Records
         .newRecord(ApplicationAttemptId.class);
     appAttemptId.setApplicationId(applicationId);
@@ -525,8 +546,10 @@ public class RMAppImpl implements RMApp 
         submissionContext, conf);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
-    handler.handle(
-        new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+    if(startAttempt) {
+      handler.handle(
+          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+    }
   }
   
   private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
@@ -553,7 +576,7 @@ public class RMAppImpl implements RMApp 
   
   private static final class StartAppAttemptTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.createNewAttempt();
+      app.createNewAttempt(true);
     };
   }
 
@@ -647,7 +670,7 @@ public class RMAppImpl implements RMApp 
         msg = "Unmanaged application " + app.getApplicationId()
             + " failed due to " + failedEvent.getDiagnostics()
             + ". Failing the application.";
-      } else if (app.attempts.size() == app.maxRetries) {
+      } else if (app.attempts.size() >= app.maxRetries) {
         retryApp = false;
         msg = "Application " + app.getApplicationId() + " failed "
             + app.maxRetries + " times due to " + failedEvent.getDiagnostics()
@@ -655,7 +678,7 @@ public class RMAppImpl implements RMApp 
       }
 
       if (retryApp) {
-        app.createNewAttempt();
+        app.createNewAttempt(true);
         return initialState;
       } else {
         LOG.info(msg);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Wed Dec 19 04:21:18 2012
@@ -39,9 +39,15 @@ public enum RMAppAttemptEventType {
   CONTAINER_ACQUIRED,
   CONTAINER_ALLOCATED,
   CONTAINER_FINISHED,
+  
+  // Source: RMStateStore
+  ATTEMPT_SAVED,
 
   // Source: Scheduler
   APP_REJECTED,
   APP_ACCEPTED,
+  
+  // Source: RMAttemptImpl.recover
+  RECOVER
 
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Dec 19 04:21:18 2012
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -57,6 +58,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -85,7 +92,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class RMAppAttemptImpl implements RMAppAttempt {
+public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
 
@@ -153,12 +160,15 @@ public class RMAppAttemptImpl implements
       .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.REGISTERED,
           new UnexpectedAMRegisteredTransition())
+      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED, 
+          RMAppAttemptEventType.RECOVER)
           
       // Transitions from SUBMITTED state
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
       .addTransition(RMAppAttemptState.SUBMITTED, 
-          EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
+          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+                     RMAppAttemptState.SCHEDULED),
           RMAppAttemptEventType.APP_ACCEPTED, 
           new ScheduleTransition())
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
@@ -170,12 +180,42 @@ public class RMAppAttemptImpl implements
           
        // Transitions from SCHEDULED State
       .addTransition(RMAppAttemptState.SCHEDULED,
-          RMAppAttemptState.ALLOCATED,
+                     RMAppAttemptState.ALLOCATED_SAVING,
           RMAppAttemptEventType.CONTAINER_ALLOCATED,
           new AMContainerAllocatedTransition())
       .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
           RMAppAttemptEventType.KILL,
           new BaseFinalTransition(RMAppAttemptState.KILLED))
+          
+       // Transitions from ALLOCATED_SAVING State
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.ALLOCATED,
+          RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.ALLOCATED_SAVING,
+          RMAppAttemptEventType.CONTAINER_ACQUIRED, 
+          new ContainerAcquiredTransition())
+       // App could be killed by the client. So need to handle this. 
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.KILLED,
+          RMAppAttemptEventType.KILL,
+          new BaseFinalTransition(RMAppAttemptState.KILLED))
+      
+       // Transitions from LAUNCHED_UNMANAGED_SAVING State
+      .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
+          RMAppAttemptState.LAUNCHED,
+          RMAppAttemptEventType.ATTEMPT_SAVED, 
+          new UnmanagedAMAttemptSavedTransition())
+      // attempt should not try to register in this state
+      .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
+          RMAppAttemptState.FAILED,
+          RMAppAttemptEventType.REGISTERED,
+          new UnexpectedAMRegisteredTransition())
+      // App could be killed by the client. So need to handle this. 
+      .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
+          RMAppAttemptState.KILLED,
+          RMAppAttemptEventType.KILL,
+          new BaseFinalTransition(RMAppAttemptState.KILLED))
 
        // Transitions from ALLOCATED State
       .addTransition(RMAppAttemptState.ALLOCATED,
@@ -279,11 +319,30 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
+              RMAppAttemptEventType.ATTEMPT_SAVED,
+              RMAppAttemptEventType.CONTAINER_FINISHED,
+              RMAppAttemptEventType.UNREGISTERED,
+              RMAppAttemptEventType.KILL,
+              RMAppAttemptEventType.STATUS_UPDATE))
+              
+      // Transitions from RECOVERED State
+      .addTransition(
+          RMAppAttemptState.RECOVERED,
+          RMAppAttemptState.RECOVERED,
+          EnumSet.of(RMAppAttemptEventType.START,
+              RMAppAttemptEventType.APP_ACCEPTED,
+              RMAppAttemptEventType.APP_REJECTED,
+              RMAppAttemptEventType.EXPIRE,
+              RMAppAttemptEventType.LAUNCHED,
+              RMAppAttemptEventType.LAUNCH_FAILED,
+              RMAppAttemptEventType.REGISTERED,
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
+              RMAppAttemptEventType.CONTAINER_ACQUIRED,
+              RMAppAttemptEventType.ATTEMPT_SAVED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.STATUS_UPDATE))
-
     .installTopology();
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -318,7 +377,7 @@ public class RMAppAttemptImpl implements
   @Override
   public ApplicationSubmissionContext getSubmissionContext() {
     return this.submissionContext;
-  }
+  } 
 
   @Override
   public FinalApplicationStatus getFinalApplicationStatus() {
@@ -494,6 +553,10 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  private void setMasterContainer(Container container) {
+    masterContainer = container;
+  }
+
   @Override
   public void handle(RMAppAttemptEvent event) {
 
@@ -561,6 +624,21 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  @Override
+  public void recover(RMState state) {
+    ApplicationState appState = 
+        state.getApplicationState().get(getAppAttemptId().getApplicationId());
+    ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
+    assert attemptState != null;
+    setMasterContainer(attemptState.getMasterContainer());
+    LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() 
+             + " AttemptId: " + getAppAttemptId()
+             + " MasterContainer: " + masterContainer);
+    setDiagnostics("Attempt recovered after RM restart");
+    handle(new RMAppAttemptEvent(getAppAttemptId(), 
+                                 RMAppAttemptEventType.RECOVER));
+  }
+  
   private static class BaseTransition implements
       SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
 
@@ -625,13 +703,12 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-
-      // Send the acceptance to the app
-      appAttempt.eventHandler.handle(new RMAppEvent(event
-          .getApplicationAttemptId().getApplicationId(),
-          RMAppEventType.APP_ACCEPTED));
-
       if (!appAttempt.submissionContext.getUnmanagedAM()) {
+        // Send the acceptance to the app
+        appAttempt.eventHandler.handle(new RMAppEvent(event
+            .getApplicationAttemptId().getApplicationId(),
+            RMAppEventType.APP_ACCEPTED));
+
         // Request a container for the AM.
         ResourceRequest request = BuilderUtils.newResourceRequest(
             AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
@@ -647,35 +724,42 @@ public class RMAppAttemptImpl implements
         return RMAppAttemptState.SCHEDULED;
       } else {
         // RM not allocating container. AM is self launched. 
-        // Directly go to LAUNCHED state
-        // Register with AMLivelinessMonitor
-        appAttempt.rmContext.getAMLivelinessMonitor().register(
-            appAttempt.applicationAttemptId);
-        return RMAppAttemptState.LAUNCHED;
+        RMStateStore store = appAttempt.rmContext.getStateStore();
+        // save state and then go to LAUNCHED state
+        appAttempt.storeAttempt(store);
+        return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
       }
     }
   }
 
-  private static final class AMContainerAllocatedTransition extends BaseTransition {
+  private static final class AMContainerAllocatedTransition 
+                                                      extends BaseTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-
+                                                     RMAppAttemptEvent event) {
       // Acquire the AM container from the scheduler.
       Allocation amContainerAllocation = appAttempt.scheduler.allocate(
           appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
           EMPTY_CONTAINER_RELEASE_LIST);
 
       // Set the masterContainer
-      appAttempt.masterContainer = amContainerAllocation.getContainers().get(
-          0);
+      appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
+                                                                           0));
 
-      // Send event to launch the AM Container
-      appAttempt.eventHandler.handle(new AMLauncherEvent(
-          AMLauncherEventType.LAUNCH, appAttempt));
+      RMStateStore store = appAttempt.rmContext.getStateStore();
+      appAttempt.storeAttempt(store);
     }
   }
-
+  
+  private static final class AttemptStoredTransition extends BaseTransition {
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt,
+                                                    RMAppAttemptEvent event) {
+      appAttempt.checkAttemptStoreError(event);
+      appAttempt.launchAttempt();
+    }
+  }
+  
   private static class BaseFinalTransition extends BaseTransition {
 
     private final RMAppAttemptState finalAttemptState;
@@ -736,17 +820,34 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static final class AMLaunchedTransition extends BaseTransition {
+  private static class AMLaunchedTransition extends BaseTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-
+                            RMAppAttemptEvent event) {
       // Register with AMLivelinessMonitor
-      appAttempt.rmContext.getAMLivelinessMonitor().register(
-          appAttempt.applicationAttemptId);
-
+      appAttempt.attemptLaunched();
     }
   }
+  
+  private static final class UnmanagedAMAttemptSavedTransition 
+                                                extends AMLaunchedTransition {
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt,
+                            RMAppAttemptEvent event) {
+      appAttempt.checkAttemptStoreError(event);
+      // Send the acceptance to the app
+      // Ideally this should have been done when the scheduler accepted the app.
+      // But its here because until the attempt is saved the client should not
+      // launch the unmanaged AM. Client waits for the app status to be accepted
+      // before doing so. So we have to delay the accepted state until we have 
+      // completed storing the attempt
+      appAttempt.eventHandler.handle(new RMAppEvent(event
+          .getApplicationAttemptId().getApplicationId(),
+          RMAppEventType.APP_ACCEPTED));
+      
+      super.transition(appAttempt, event);
+    }    
+  }
 
   private static final class LaunchFailedTransition extends BaseFinalTransition {
 
@@ -1040,4 +1141,37 @@ public class RMAppAttemptImpl implements
       this.readLock.unlock();
     }
   }
+  
+  private void launchAttempt(){
+    // Send event to launch the AM Container
+    eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
+  }
+  
+  private void attemptLaunched() {
+    // Register with AMLivelinessMonitor
+    rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
+  }
+  
+  private void checkAttemptStoreError(RMAppAttemptEvent event) {
+    RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
+    if(storeEvent.getStoredException() != null)
+    {
+      // This needs to be handled for HA and give up master status if we got
+      // fenced
+      LOG.error("Failed to store attempt: " + getAppAttemptId(),
+                storeEvent.getStoredException());
+      ExitUtil.terminate(1, storeEvent.getStoredException());
+    }
+  }
+  
+  private void storeAttempt(RMStateStore store) {
+    // store attempt data in a non-blocking manner to prevent dispatcher
+    // thread starvation and wait for state to be saved
+    LOG.info("Storing attempt: AppId: " + 
+              getAppAttemptId().getApplicationId() 
+              + " AttemptId: " + 
+              getAppAttemptId()
+              + " MasterContainer: " + masterContainer);
+    store.storeApplicationAttempt(this);
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Wed Dec 19 04:21:18 2012
@@ -19,6 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 public enum RMAppAttemptState {
-  NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
-  FINISHING, FINISHED, KILLED,
+  NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, 
+  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStoredEvent.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+
+public class RMAppAttemptStoredEvent extends RMAppAttemptEvent {
+
+  final Exception storedException;
+  
+  public RMAppAttemptStoredEvent(ApplicationAttemptId appAttemptId,
+                                 Exception storedException) {
+    super(appAttemptId, RMAppAttemptEventType.ATTEMPT_SAVED);
+    this.storedException = storedException;
+  }
+  
+  public Exception getStoredException() {
+    return storedException;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Wed Dec 19 04:21:18 2012
@@ -47,7 +47,7 @@ public class MockAM {
   private volatile int responseId = 0;
   private final ApplicationAttemptId attemptId;
   private final RMContext context;
-  private final AMRMProtocol amRMProtocol;
+  private AMRMProtocol amRMProtocol;
 
   private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
   private final List<ContainerId> releases = new ArrayList<ContainerId>();
@@ -58,6 +58,10 @@ public class MockAM {
     this.amRMProtocol = amRMProtocol;
     this.attemptId = attemptId;
   }
+  
+  void setAMRMProtocol(AMRMProtocol amRMProtocol) {
+    this.amRMProtocol = amRMProtocol;
+  }
 
   public void waitForState(RMAppAttemptState finalState) throws Exception {
     RMApp app = context.getRMApps().get(attemptId.getApplicationId());
@@ -66,7 +70,8 @@ public class MockAM {
     while (!finalState.equals(attempt.getAppAttemptState())
         && timeoutSecs++ < 20) {
       System.out
-          .println("AppAttempt State is : " + attempt.getAppAttemptState()
+          .println("AppAttempt : " + attemptId + " State is : " 
+              + attempt.getAppAttemptState()
               + " Waiting for state : " + finalState);
       Thread.sleep(500);
     }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Wed Dec 19 04:21:18 2012
@@ -46,7 +46,7 @@ public class MockNM {
   private int responseId;
   private NodeId nodeId;
   private final int memory;
-  private final ResourceTrackerService resourceTracker;
+  private ResourceTrackerService resourceTracker;
   private final int httpPort = 2;
   private MasterKey currentMasterKey;
 
@@ -66,6 +66,10 @@ public class MockNM {
   public int getHttpPort() {
     return httpPort;
   }
+  
+  void setResourceTrackerService(ResourceTrackerService resourceTracker) {
+    this.resourceTracker = resourceTracker;
+  }
 
   public void containerStatus(Container container) throws Exception {
     Map<ApplicationId, List<ContainerStatus>> conts = 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Dec 19 04:21:18 2012
@@ -39,9 +39,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -63,10 +64,17 @@ public class MockRM extends ResourceMana
   }
 
   public MockRM(Configuration conf) {
-    super(StoreFactory.getStore(conf));    
+    this(conf, null);    
+  }
+  
+  public MockRM(Configuration conf, RMStateStore store) {
+    super();    
     init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
+    if(store != null) {
+      setRMStateStore(store);
+    }
     Logger rootLogger = LogManager.getRootLogger();
-    rootLogger.setLevel(Level.DEBUG);
+    rootLogger.setLevel(Level.DEBUG);    
   }
 
   public void waitForState(ApplicationId appId, RMAppState finalState)
@@ -75,7 +83,7 @@ public class MockRM extends ResourceMana
     Assert.assertNotNull("app shouldn't be null", app);
     int timeoutSecs = 0;
     while (!finalState.equals(app.getState()) && timeoutSecs++ < 20) {
-      System.out.println("App State is : " + app.getState()
+      System.out.println("App : " + appId + " State is : " + app.getState()
           + " Waiting for state : " + finalState);
       Thread.sleep(500);
     }
@@ -83,6 +91,24 @@ public class MockRM extends ResourceMana
     Assert.assertEquals("App state is not correct (timedout)", finalState,
         app.getState());
   }
+  
+  public void waitForState(ApplicationAttemptId attemptId, 
+                           RMAppAttemptState finalState)
+      throws Exception {
+    RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
+    Assert.assertNotNull("app shouldn't be null", app);
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    int timeoutSecs = 0;
+    while (!finalState.equals(attempt.getAppAttemptState()) && timeoutSecs++ < 20) {
+      System.out.println("AppAttempt : " + attemptId 
+          + " State is : " + attempt.getAppAttemptState()
+          + " Waiting for state : " + finalState);
+      Thread.sleep(500);
+    }
+    System.out.println("Attempt State is : " + attempt.getAppAttemptState());
+    Assert.assertEquals("Attempt state is not correct (timedout)", finalState,
+        attempt.getAppAttemptState());
+  }
 
   // get new application id
   public GetNewApplicationResponse getNewAppId() throws Exception {
@@ -97,11 +123,16 @@ public class MockRM extends ResourceMana
 
   // client
   public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
-    return submitApp(masterMemory, name, user, null);
+    return submitApp(masterMemory, name, user, null, false);
   }
-
+  
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls) throws Exception {
+    return submitApp(masterMemory, name, user, acls, false);
+  }  
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged) throws Exception {
     ClientRMProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -114,6 +145,9 @@ public class MockRM extends ResourceMana
     sub.setApplicationId(appId);
     sub.setApplicationName(name);
     sub.setUser(user);
+    if(unmanaged) {
+      sub.setUnmanagedAM(true);
+    }
     ContainerLaunchContext clc = Records
         .newRecord(ContainerLaunchContext.class);
     Resource capability = Records.newRecord(Resource.class);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java Wed Dec 19 04:21:18 2012
@@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.service.Service.STATE;
 import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -85,7 +85,7 @@ public class TestApplicationACLs {
 
   @BeforeClass
   public static void setup() throws InterruptedException, IOException {
-    RMStateStore store = StoreFactory.getStore(conf);
+    RMStateStore store = RMStateStoreFactory.getStore(conf);
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     AccessControlList adminACL = new AccessControlList("");
     adminACL.addGroup(SUPER_GROUP);

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1423758&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Dec 19 04:21:18 2012
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMRestart {
+  
+  @Test
+  public void testRMRestart() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    ExitUtil.disableSystemExit();
+    
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, 
+    "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
+    conf.set(YarnConfiguration.RM_SCHEDULER, 
+    "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationState> rmAppState = 
+                                                  rmState.getApplicationState();
+    
+    
+    // PHASE 1: create state in an RM
+    
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    
+    // start like normal because state is empty
+    rm1.start();
+    
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode(); // nm2 will not heartbeat with RM1
+    
+    // create app that will not be saved because it will finish
+    RMApp app0 = rm1.submitApp(200);
+    RMAppAttempt attempt0 = app0.getCurrentAppAttempt();
+    // spot check that app is saved
+    Assert.assertEquals(1, rmAppState.size());
+    nm1.nodeHeartbeat(true);
+    MockAM am0 = rm1.sendAMLaunched(attempt0.getAppAttemptId());
+    am0.registerAppAttempt();
+    am0.unregisterAppAttempt();
+    nm1.nodeHeartbeat(attempt0.getAppAttemptId(), 1, ContainerState.COMPLETE);
+    am0.waitForState(RMAppAttemptState.FINISHED);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+
+    // spot check that app is not saved anymore
+    Assert.assertEquals(0, rmAppState.size());
+        
+    // create app that gets launched and does allocate before RM restart
+    RMApp app1 = rm1.submitApp(200);
+    // assert app1 info is saved
+    ApplicationState appState = rmAppState.get(app1.getApplicationId());
+    Assert.assertNotNull(appState);
+    Assert.assertEquals(0, appState.getAttemptCount());
+    Assert.assertEquals(appState.getApplicationSubmissionContext()
+        .getApplicationId(), app1.getApplicationSubmissionContext()
+        .getApplicationId());
+
+    //kick the scheduling to allocate AM container
+    nm1.nodeHeartbeat(true);
+    
+    // assert app1 attempt is saved
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+    rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+    Assert.assertEquals(1, appState.getAttemptCount());
+    ApplicationAttemptState attemptState = 
+                                appState.getAttempt(attemptId1);
+    Assert.assertNotNull(attemptState);
+    Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), 
+                        attemptState.getMasterContainer().getId());
+    
+    // launch the AM
+    MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+
+    // AM request for containers
+    am1.allocate("h1" , 1000, 1, new ArrayList<ContainerId>());    
+    // kick the scheduler
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() == 0) {
+      nm1.nodeHeartbeat(true);
+      conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+    
+    // create app that does not get launched by RM before RM restart
+    RMApp app2 = rm1.submitApp(200);
+
+    // assert app2 info is saved
+    appState = rmAppState.get(app2.getApplicationId());
+    Assert.assertNotNull(appState);
+    Assert.assertEquals(0, appState.getAttemptCount());
+    Assert.assertEquals(appState.getApplicationSubmissionContext()
+        .getApplicationId(), app2.getApplicationSubmissionContext()
+        .getApplicationId());
+    
+    // create unmanaged app
+    RMApp appUnmanaged = rm1.submitApp(200, "", "", null, true);
+    ApplicationAttemptId unmanagedAttemptId = 
+                        appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
+    // assert appUnmanaged info is saved
+    ApplicationId unmanagedAppId = appUnmanaged.getApplicationId();
+    appState = rmAppState.get(unmanagedAppId);
+    Assert.assertNotNull(appState);
+    // wait for attempt to reach LAUNCHED state 
+    rm1.waitForState(unmanagedAttemptId, RMAppAttemptState.LAUNCHED);
+    rm1.waitForState(unmanagedAppId, RMAppState.ACCEPTED);
+    // assert unmanaged attempt info is saved
+    Assert.assertEquals(1, appState.getAttemptCount());
+    Assert.assertEquals(appState.getApplicationSubmissionContext()
+        .getApplicationId(), appUnmanaged.getApplicationSubmissionContext()
+        .getApplicationId());  
+    
+    
+    // PHASE 2: create new RM and start from old state
+    
+    // create new RM to represent restart and recover state
+    MockRM rm2 = new MockRM(conf, memStore);
+    
+    // start new RM
+    rm2.start();
+    
+    // change NM to point to new RM
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    // verify load of old state
+    // only 2 apps are loaded since unmanaged app is not loaded back since it
+    // cannot be restarted by the RM this will change with work preserving RM
+    // restart in which AMs/NMs are not rebooted
+    Assert.assertEquals(2, rm2.getRMContext().getRMApps().size());
+    
+    // verify correct number of attempts and other data
+    RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    Assert.assertNotNull(loadedApp1);
+    //Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
+    Assert.assertEquals(app1.getApplicationSubmissionContext()
+        .getApplicationId(), loadedApp1.getApplicationSubmissionContext()
+        .getApplicationId());
+    
+    RMApp loadedApp2 = rm2.getRMContext().getRMApps().get(app2.getApplicationId());
+    Assert.assertNotNull(loadedApp2);
+    //Assert.assertEquals(0, loadedApp2.getAppAttempts().size());
+    Assert.assertEquals(app2.getApplicationSubmissionContext()
+        .getApplicationId(), loadedApp2.getApplicationSubmissionContext()
+        .getApplicationId());
+    
+    // verify state machine kicked into expected states
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
+    
+    // verify new attempts created
+    Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
+    Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
+    
+    // verify old AM is not accepted
+    // change running AM to talk to new RM
+    am1.setAMRMProtocol(rm2.getApplicationMasterService());
+    AMResponse amResponse = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>());
+    Assert.assertTrue(amResponse.getReboot());
+    
+    // NM should be rebooted on heartbeat, even first heartbeat for nm2
+    HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+    hbResponse = nm2.nodeHeartbeat(true);
+    Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+    
+    // new NM to represent NM re-register
+    nm1 = rm2.registerNode("h1:1234", 15120);
+    nm2 = rm2.registerNode("h2:5678", 15120);
+
+    // verify no more reboot response sent
+    hbResponse = nm1.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+    hbResponse = nm2.nodeHeartbeat(true);
+    Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+    
+    // assert app1 attempt is saved
+    attempt1 = loadedApp1.getCurrentAppAttempt();
+    attemptId1 = attempt1.getAppAttemptId();
+    rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+    appState = rmAppState.get(loadedApp1.getApplicationId());
+    attemptState = appState.getAttempt(attemptId1);
+    Assert.assertNotNull(attemptState);
+    Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), 
+                        attemptState.getMasterContainer().getId());
+
+    // Nodes on which the AM's run 
+    MockNM am1Node = nm1;
+    if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
+      am1Node = nm2;
+    }
+
+    // assert app2 attempt is saved
+    RMAppAttempt attempt2 = loadedApp2.getCurrentAppAttempt();
+    ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId();
+    rm2.waitForState(attemptId2, RMAppAttemptState.ALLOCATED);
+    appState = rmAppState.get(loadedApp2.getApplicationId());
+    attemptState = appState.getAttempt(attemptId2);
+    Assert.assertNotNull(attemptState);
+    Assert.assertEquals(BuilderUtils.newContainerId(attemptId2, 1), 
+                        attemptState.getMasterContainer().getId());
+
+    MockNM am2Node = nm1;
+    if(attemptState.getMasterContainer().getNodeId().toString().contains("h2")){
+      am2Node = nm2;
+    }
+    
+    // start the AM's
+    am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    
+    MockAM am2 = rm2.sendAMLaunched(attempt2.getAppAttemptId());
+    am2.registerAppAttempt();
+
+    //request for containers
+    am1.allocate("h1" , 1000, 3, new ArrayList<ContainerId>());
+    am2.allocate("h2" , 1000, 1, new ArrayList<ContainerId>());
+    
+    // verify container allocate continues to work
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    conts = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    while (conts.size() == 0) {
+      nm1.nodeHeartbeat(true);
+      nm2.nodeHeartbeat(true);
+      conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+
+    // finish the AM's
+    am1.unregisterAppAttempt();
+    am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE);
+    am1.waitForState(RMAppAttemptState.FINISHED);
+    
+    am2.unregisterAppAttempt();
+    am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE);
+    am2.waitForState(RMAppAttemptState.FINISHED);
+    
+    // stop RM's
+    rm2.stop();
+    rm1.stop();
+    
+    // completed apps should be removed
+    Assert.assertEquals(0, rmAppState.size());
+ }
+  
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Wed Dec 19 04:21:18 2012
@@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.junit.After;
@@ -47,8 +45,7 @@ public class TestResourceManager {
   @Before
   public void setUp() throws Exception {
     Configuration conf = new YarnConfiguration();
-    RMStateStore store = StoreFactory.getStore(conf);
-    resourceManager = new ResourceManager(store);
+    resourceManager = new ResourceManager();
     resourceManager.init(conf);
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Wed Dec 19 04:21:18 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -153,6 +154,11 @@ public abstract class MockAsm extends Mo
     }
 
     @Override
+    public ApplicationSubmissionContext getApplicationSubmissionContext() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+    @Override
     public String getName() {
       throw new UnsupportedOperationException("Not supported yet.");
     }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java Wed Dec 19 04:21:18 2012
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.junit.After;
 import org.junit.Before;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Wed Dec 19 04:21:18 2012
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Wed Dec 19 04:21:18 2012
@@ -24,9 +24,11 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -67,6 +69,11 @@ public class MockRMApp implements RMApp 
   public ApplicationId getApplicationId() {
     return id;
   }
+  
+  @Override
+  public ApplicationSubmissionContext getApplicationSubmissionContext() {
+    return new ApplicationSubmissionContextPBImpl();
+  }
 
   @Override
   public RMAppState getState() {
@@ -118,7 +125,9 @@ public class MockRMApp implements RMApp 
   public Map<ApplicationAttemptId, RMAppAttempt> getAppAttempts() {
     Map<ApplicationAttemptId, RMAppAttempt> attempts =
       new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
-    attempts.put(attempt.getAppAttemptId(), attempt);
+    if(attempt != null) {
+      attempts.put(attempt.getAppAttemptId(), attempt);
+    }
     return attempts;
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Wed Dec 19 04:21:18 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -167,6 +169,9 @@ public class TestRMAppAttemptTransitions
           new RMContainerTokenSecretManager(conf),
           new ClientToAMTokenSecretManagerInRM());
     
+    RMStateStore store = mock(RMStateStore.class);
+    ((RMContextImpl) rmContext).setStateStore(store);
+    
     scheduler = mock(YarnScheduler.class);
     masterService = mock(ApplicationMasterService.class);
     applicationMasterLauncher = mock(ApplicationMasterLauncher.class);
@@ -295,6 +300,14 @@ public class TestRMAppAttemptTransitions
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
   }
+  
+  /**
+   * {@link RMAppAttemptState#RECOVERED}
+   */
+  private void testAppAttemptRecoveredState() {
+    assertEquals(RMAppAttemptState.RECOVERED, 
+        applicationAttempt.getAppAttemptState());
+  }
 
   /**
    * {@link RMAppAttemptState#SCHEDULED}
@@ -438,6 +451,15 @@ public class TestRMAppAttemptTransitions
         new RMAppAttemptEvent(
             applicationAttempt.getAppAttemptId(), 
             RMAppAttemptEventType.APP_ACCEPTED));
+    
+    if(unmanagedAM){
+      assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
+          applicationAttempt.getAppAttemptState());
+      applicationAttempt.handle(
+          new RMAppAttemptStoredEvent(
+              applicationAttempt.getAppAttemptId(), null));
+    }
+    
     testAppAttemptScheduledState();
   }
 
@@ -463,6 +485,12 @@ public class TestRMAppAttemptTransitions
             applicationAttempt.getAppAttemptId(), 
             container));
     
+    assertEquals(RMAppAttemptState.ALLOCATED_SAVING, 
+        applicationAttempt.getAppAttemptState());
+    applicationAttempt.handle(
+        new RMAppAttemptStoredEvent(
+            applicationAttempt.getAppAttemptId(), null));
+    
     testAppAttemptAllocatedState(container);
     
     return container;
@@ -555,6 +583,15 @@ public class TestRMAppAttemptTransitions
   } 
   
   @Test
+  public void testNewToRecovered() {
+    applicationAttempt.handle(
+        new RMAppAttemptEvent(
+            applicationAttempt.getAppAttemptId(), 
+            RMAppAttemptEventType.RECOVER));
+    testAppAttemptRecoveredState();
+  }
+  
+  @Test
   public void testSubmittedToFailed() {
     submitApplicationAttempt();
     String message = "Rejected";
@@ -604,7 +641,7 @@ public class TestRMAppAttemptTransitions
             diagnostics));
     testAppAttemptFailedState(amContainer, diagnostics);
   }
-
+  
   @Test
   public void testRunningToFailed() {
     Container amContainer = allocateApplicationAttempt();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Wed Dec 19 04:21:18 2012
@@ -27,7 +27,6 @@ import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -40,8 +39,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -76,8 +73,7 @@ public class TestCapacityScheduler {
   
   @Before
   public void setUp() throws Exception {
-    RMStateStore store = StoreFactory.getStore(new Configuration());
-    resourceManager = new ResourceManager(store);
+    resourceManager = new ResourceManager();
     CapacitySchedulerConfiguration csConf 
        = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(csConf);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Wed Dec 19 04:21:18 2012
@@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.junit.Before;
@@ -47,8 +45,7 @@ public class TestFSLeafQueue {
     Configuration conf = createConfiguration();
     // All tests assume only one assignment per node update
     conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
-    RMStateStore store = StoreFactory.getStore(conf);
-    ResourceManager resourceManager = new ResourceManager(store);
+    ResourceManager resourceManager = new ResourceManager();
     resourceManager.init(conf);
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Wed Dec 19 04:21:18 2012
@@ -51,8 +51,6 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -102,8 +100,7 @@ public class TestFairScheduler {
     Configuration conf = createConfiguration();
     // All tests assume only one assignment per node update
     conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
-    RMStateStore store = StoreFactory.getStore(conf);
-    resourceManager = new ResourceManager(store);
+    resourceManager = new ResourceManager();
     resourceManager.init(conf);
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java Wed Dec 19 04:21:18 2012
@@ -27,8 +27,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.junit.After;
 import org.junit.Before;
@@ -50,8 +48,7 @@ public class TestFairSchedulerEventLog {
 
     // All tests assume only one assignment per node update
     conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
-    RMStateStore store = StoreFactory.getStore(conf);
-    resourceManager = new ResourceManager(store);
+    resourceManager = new ResourceManager();
     resourceManager.init(conf);
     ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Wed Dec 19 04:21:18 2012
@@ -38,8 +38,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -59,8 +57,7 @@ public class TestFifoScheduler {
   
   @Before
   public void setUp() throws Exception {
-    RMStateStore store = StoreFactory.getStore(new Configuration());
-    resourceManager = new ResourceManager(store);
+    resourceManager = new ResourceManager();
     Configuration conf = new Configuration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, 
         FifoScheduler.class, ResourceScheduler.class);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Wed Dec 19 04:21:18 2012
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 
@@ -154,8 +152,7 @@ public class MiniYARNCluster extends Com
           getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
               MiniYARNCluster.getHostname() + ":0");
         }
-        RMStateStore store = StoreFactory.getStore(getConfig());
-        resourceManager = new ResourceManager(store) {
+        resourceManager = new ResourceManager() {
           @Override
           protected void doSecureLogin() throws IOException {
             // Don't try to login using keytab in the testcase.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java?rev=1423758&r1=1423757&r2=1423758&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestRMNMSecretKeys.java Wed Dec 19 04:21:18 2012
@@ -47,7 +47,7 @@ public class TestRMNMSecretKeys {
     // intervene
 
     final DrainDispatcher dispatcher = new DrainDispatcher();
-    ResourceManager rm = new ResourceManager(null) {
+    ResourceManager rm = new ResourceManager() {
       @Override
       protected void doSecureLogin() throws IOException {
         // Do nothing.