You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by su...@apache.org on 2013/01/11 20:40:30 UTC

svn commit: r1432246 [2/3] - in /hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop...

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Fri Jan 11 19:40:23 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,10 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -48,7 +53,8 @@ import org.apache.hadoop.yarn.server.sec
 /**
  * This class manages the list of applications for the resource manager. 
  */
-public class RMAppManager implements EventHandler<RMAppManagerEvent> {
+public class RMAppManager implements EventHandler<RMAppManagerEvent>, 
+                                        Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppManager.class);
 
@@ -173,6 +179,10 @@ public class RMAppManager implements Eve
       
       completedApps.add(applicationId);  
       writeAuditLog(applicationId);
+      
+      // application completely done. Remove from state
+      RMStateStore store = rmContext.getStateStore();
+      store.removeApplication(rmContext.getRMApps().get(applicationId));
     }
   }
 
@@ -306,6 +316,37 @@ public class RMAppManager implements Eve
     }
     return credentials;
   }
+  
+  @Override
+  public void recover(RMState state) throws Exception {
+    RMStateStore store = rmContext.getStateStore();
+    assert store != null;
+    // recover applications
+    Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
+    LOG.info("Recovering " + appStates.size() + " applications");
+    for(ApplicationState appState : appStates.values()) {
+      // re-submit the application
+      // this is going to send an app start event but since the async dispatcher 
+      // has not started that event will be queued until we have completed re
+      // populating the state
+      if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
+        // do not recover unmanaged applications since current recovery 
+        // mechanism of restarting attempts does not work for them.
+        // This will need to be changed in work preserving recovery in which 
+        // RM will re-connect with the running AM's instead of restarting them
+        LOG.info("Not recovering unmanaged application " + appState.getAppId());
+        store.removeApplication(appState);
+      } else {
+        LOG.info("Recovering application " + appState.getAppId());
+        submitApplication(appState.getApplicationSubmissionContext(), 
+                          appState.getSubmitTime());
+        // re-populate attempt information in application
+        RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
+                                                          appState.getAppId());
+        appImpl.recover(state);
+      }
+    }
+  }
 
   @Override
   public void handle(RMAppManagerEvent event) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Fri Jan 11 19:40:23 2013
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.res
 public interface RMContext {
 
   Dispatcher getDispatcher();
+  
+  RMStateStore getStateStore();
 
   ConcurrentMap<ApplicationId, RMApp> getRMApps();
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Fri Jan 11 19:40:23 2013
@@ -23,7 +23,10 @@ import java.util.concurrent.ConcurrentMa
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class RMContextImpl implements RMContext {
 
   private final Dispatcher rmDispatcher;
@@ -48,6 +53,7 @@ public class RMContextImpl implements RM
 
   private AMLivelinessMonitor amLivelinessMonitor;
   private AMLivelinessMonitor amFinishingMonitor;
+  private RMStateStore stateStore = null;
   private ContainerAllocationExpirer containerAllocationExpirer;
   private final DelegationTokenRenewer tokenRenewer;
   private final ApplicationTokenSecretManager appTokenSecretManager;
@@ -55,6 +61,7 @@ public class RMContextImpl implements RM
   private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
 
   public RMContextImpl(Dispatcher rmDispatcher,
+      RMStateStore store,
       ContainerAllocationExpirer containerAllocationExpirer,
       AMLivelinessMonitor amLivelinessMonitor,
       AMLivelinessMonitor amFinishingMonitor,
@@ -63,6 +70,7 @@ public class RMContextImpl implements RM
       RMContainerTokenSecretManager containerTokenSecretManager,
       ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
     this.rmDispatcher = rmDispatcher;
+    this.stateStore = store;
     this.containerAllocationExpirer = containerAllocationExpirer;
     this.amLivelinessMonitor = amLivelinessMonitor;
     this.amFinishingMonitor = amFinishingMonitor;
@@ -71,11 +79,39 @@ public class RMContextImpl implements RM
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.clientToAMTokenSecretManager = clientTokenSecretManager;
   }
+
+  @VisibleForTesting
+  // helper constructor for tests
+  public RMContextImpl(Dispatcher rmDispatcher,
+      ContainerAllocationExpirer containerAllocationExpirer,
+      AMLivelinessMonitor amLivelinessMonitor,
+      AMLivelinessMonitor amFinishingMonitor,
+      DelegationTokenRenewer tokenRenewer,
+      ApplicationTokenSecretManager appTokenSecretManager,
+      RMContainerTokenSecretManager containerTokenSecretManager,
+      ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
+    this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, 
+          amFinishingMonitor, tokenRenewer, appTokenSecretManager, 
+          containerTokenSecretManager, clientTokenSecretManager);
+    RMStateStore nullStore = new NullRMStateStore();
+    nullStore.setDispatcher(rmDispatcher);
+    try {
+      nullStore.init(new YarnConfiguration());
+      setStateStore(nullStore);
+    } catch (Exception e) {
+      assert false;
+    }
+  }
   
   @Override
   public Dispatcher getDispatcher() {
     return this.rmDispatcher;
   }
+  
+  @Override 
+  public RMStateStore getStateStore() {
+    return stateStore;
+  }
 
   @Override
   public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
@@ -126,4 +162,9 @@ public class RMContextImpl implements RM
   public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
     return this.clientToAMTokenSecretManager;
   }
+  
+  @VisibleForTesting
+  public void setStateStore(RMStateStore store) {
+    stateStore = store;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Fri Jan 11 19:40:23 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -45,10 +46,11 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -80,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.Web
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The ResourceManager is the main class that is a set of components.
  * "I am the ResourceManager. All your resources are belong to us..."
@@ -119,14 +123,13 @@ public class ResourceManager extends Com
   protected RMDelegationTokenSecretManager rmDTSecretManager;
   private WebApp webApp;
   protected RMContext rmContext;
-  private final RMStateStore store;
   protected ResourceTrackerService resourceTracker;
+  private boolean recoveryEnabled;
 
   private Configuration conf;
-
-  public ResourceManager(RMStateStore store) {
+  
+  public ResourceManager() {
     super("ResourceManager");
-    this.store = store;
   }
   
   public RMContext getRMContext() {
@@ -160,12 +163,34 @@ public class ResourceManager extends Com
 
     this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
     
+    boolean isRecoveryEnabled = conf.getBoolean(
+        YarnConfiguration.RECOVERY_ENABLED,
+        YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+    
+    RMStateStore rmStore = null;
+    if(isRecoveryEnabled) {
+      recoveryEnabled = true;
+      rmStore =  RMStateStoreFactory.getStore(conf);
+    } else {
+      recoveryEnabled = false;
+      rmStore = new NullRMStateStore();
+    }
+    try {
+      rmStore.init(conf);
+      rmStore.setDispatcher(rmDispatcher);
+    } catch (Exception e) {
+      // the Exception from stateStore.init() needs to be handled for 
+      // HA and we need to give up master status if we got fenced
+      LOG.error("Failed to init state store", e);
+      ExitUtil.terminate(1, e);
+    }
+    
     this.rmContext =
-        new RMContextImpl(this.rmDispatcher,
+        new RMContextImpl(this.rmDispatcher, rmStore,
           this.containerAllocationExpirer, amLivelinessMonitor,
           amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
           this.containerTokenSecretManager, this.clientToAMSecretManager);
-
+    
     // Register event handler for NodesListManager
     this.nodesListManager = new NodesListManager(this.rmContext);
     this.rmDispatcher.register(NodesListManagerEventType.class, 
@@ -226,9 +251,15 @@ public class ResourceManager extends Com
     addService(applicationMasterLauncher);
 
     new RMNMInfo(this.rmContext, this.scheduler);
-
+    
     super.init(conf);
   }
+  
+  @VisibleForTesting
+  protected void setRMStateStore(RMStateStore rmStore) {
+    rmStore.setDispatcher(rmDispatcher);
+    ((RMContextImpl) rmContext).setStateStore(rmStore);
+  }
 
   protected RMContainerTokenSecretManager createContainerTokenSecretManager(
       Configuration conf) {
@@ -502,6 +533,19 @@ public class ResourceManager extends Com
     this.appTokenSecretManager.start();
     this.containerTokenSecretManager.start();
 
+    if(recoveryEnabled) {
+      try {
+        RMStateStore rmStore = rmContext.getStateStore();
+        RMState state = rmStore.loadState();
+        recover(state);
+      } catch (Exception e) {
+        // the Exception from loadState() needs to be handled for 
+        // HA and we need to give up master status if we got fenced
+        LOG.error("Failed to load/recover state", e);
+        ExitUtil.terminate(1, e);
+      }
+    }
+
     startWepApp();
     DefaultMetricsSystem.initialize("ResourceManager");
     JvmMetrics.initSingleton("ResourceManager", null);
@@ -555,6 +599,13 @@ public class ResourceManager extends Com
 
     DefaultMetricsSystem.shutdown();
 
+    RMStateStore store = rmContext.getStateStore();
+    try {
+      store.close();
+    } catch (Exception e) {
+      LOG.error("Error closing store.", e);
+    }
+      
     super.stop();
   }
   
@@ -643,6 +694,8 @@ public class ResourceManager extends Com
 
   @Override
   public void recover(RMState state) throws Exception {
+    // recover applications
+    rmAppManager.recover(state);
   }
   
   public static void main(String argv[]) {
@@ -650,13 +703,11 @@ public class ResourceManager extends Com
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
     try {
       Configuration conf = new YarnConfiguration();
-      RMStateStore store =  StoreFactory.getStore(conf);
-      ResourceManager resourceManager = new ResourceManager(store);
+      ResourceManager resourceManager = new ResourceManager();
       ShutdownHookManager.get().addShutdownHook(
         new CompositeServiceShutdownHook(resourceManager),
         SHUTDOWN_HOOK_PRIORITY);
       resourceManager.init(conf);
-      //resourceManager.recover(store.restore());
       resourceManager.start();
     } catch (Throwable t) {
       LOG.fatal("Error starting ResourceManager", t);

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Fri Jan 11 19:40:23 2013
@@ -15,10 +15,313 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
+
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
-public interface RMStateStore {
-  public interface RMState {
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+
+@Private
+@Unstable
+/**
+ * Base class to implement storage of ResourceManager state.
+ * Takes care of asynchronous notifications and interfacing with YARN objects.
+ * Real store implementations need to derive from it and implement blocking
+ * store and load methods to actually store and load the state.
+ */
+public abstract class RMStateStore {
+  public static final Log LOG = LogFactory.getLog(RMStateStore.class);
+  
+  /**
+   * State of an application attempt
+   */
+  public static class ApplicationAttemptState {
+    final ApplicationAttemptId attemptId;
+    final Container masterContainer;
+    
+    public ApplicationAttemptState(ApplicationAttemptId attemptId,
+                                   Container masterContainer) {
+      this.attemptId = attemptId;
+      this.masterContainer = masterContainer;
+    }
+    
+    public Container getMasterContainer() {
+      return masterContainer;
+    }
+    public ApplicationAttemptId getAttemptId() {
+      return attemptId;
+    }
+  }
+  
+  /**
+   * State of an application application
+   */
+  public static class ApplicationState {
+    final ApplicationSubmissionContext context;
+    final long submitTime;
+    Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
+                  new HashMap<ApplicationAttemptId, ApplicationAttemptState>();
+    
+    ApplicationState(long submitTime, ApplicationSubmissionContext context) {
+      this.submitTime = submitTime;
+      this.context = context;
+    }
+
+    public ApplicationId getAppId() {
+      return context.getApplicationId();
+    }
+    public long getSubmitTime() {
+      return submitTime;
+    }
+    public int getAttemptCount() {
+      return attempts.size();
+    }
+    public ApplicationSubmissionContext getApplicationSubmissionContext() {
+      return context;
+    }
+    public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) {
+      return attempts.get(attemptId);
+    }
+  }
+  
+  /**
+   * State of the ResourceManager
+   */
+  public static class RMState {
+    Map<ApplicationId, ApplicationState> appState = 
+                                new HashMap<ApplicationId, ApplicationState>();
+    
+    public Map<ApplicationId, ApplicationState> getApplicationState() {
+      return appState;
+    }
+  }
+    
+  private Dispatcher rmDispatcher;
+
+  /**
+   * Dispatcher used to send state operation completion events to 
+   * ResourceManager services
+   */
+  public void setDispatcher(Dispatcher dispatcher) {
+    this.rmDispatcher = dispatcher;
+  }
+  
+  AsyncDispatcher dispatcher;
+  
+  public synchronized void init(Configuration conf) throws Exception{    
+    // create async handler
+    dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.register(RMStateStoreEventType.class, 
+                        new ForwardingEventHandler());
+    dispatcher.start();
+    
+    initInternal(conf);
+  }
+
+  /**
+   * Derived classes initialize themselves using this method.
+   * The base class is initialized and the event dispatcher is ready to use at
+   * this point
+   */
+  protected abstract void initInternal(Configuration conf) throws Exception;
+  
+  public synchronized void close() throws Exception {
+    closeInternal();
+    dispatcher.stop();
+  }
+  
+  /**
+   * Derived classes close themselves using this method.
+   * The base class will be closed and the event dispatcher will be shutdown 
+   * after this
+   */
+  protected abstract void closeInternal() throws Exception;
+  
+  /**
+   * Blocking API
+   * The derived class must recover state from the store and return a new 
+   * RMState object populated with that state
+   * This must not be called on the dispatcher thread
+   */
+  public abstract RMState loadState() throws Exception;
+  
+  /**
+   * Blocking API
+   * ResourceManager services use this to store the application's state
+   * This must not be called on the dispatcher thread
+   */
+  public synchronized void storeApplication(RMApp app) throws Exception {
+    ApplicationSubmissionContext context = app
+                                            .getApplicationSubmissionContext();
+    assert context instanceof ApplicationSubmissionContextPBImpl;
+
+    ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl();
+    appStateData.setSubmitTime(app.getSubmitTime());
+    appStateData.setApplicationSubmissionContext(context);
+
+    LOG.info("Storing info for app: " + context.getApplicationId());
+    storeApplicationState(app.getApplicationId().toString(), appStateData);
+  }
+    
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of an 
+   * application.
+   */
+  protected abstract void storeApplicationState(String appId,
+                                      ApplicationStateDataPBImpl appStateData) 
+                                      throws Exception;
+  
+  @SuppressWarnings("unchecked")
+  /**
+   * Non-blocking API
+   * ResourceManager services call this to store state on an application attempt
+   * This does not block the dispatcher threads
+   * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt
+   */
+  public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) {
+    ApplicationAttemptState attemptState = new ApplicationAttemptState(
+                appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+    dispatcher.getEventHandler().handle(
+                                new RMStateStoreAppAttemptEvent(attemptState));
+  }
+  
+  /**
+   * Blocking API
+   * Derived classes must implement this method to store the state of an 
+   * application attempt
+   */
+  protected abstract void storeApplicationAttemptState(String attemptId,
+                            ApplicationAttemptStateDataPBImpl attemptStateData) 
+                            throws Exception;
+  
+  
+  /**
+   * Non-blocking API
+   * ResourceManager services call this to remove an application from the state
+   * store
+   * This does not block the dispatcher threads
+   * There is no notification of completion for this operation.
+   */
+  public synchronized void removeApplication(RMApp app) {
+    ApplicationState appState = new ApplicationState(
+        app.getSubmitTime(), app.getApplicationSubmissionContext());
+    for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
+      ApplicationAttemptState attemptState = new ApplicationAttemptState(
+                appAttempt.getAppAttemptId(), appAttempt.getMasterContainer());
+      appState.attempts.put(attemptState.getAttemptId(), attemptState);
+    }
+    
+    removeApplication(appState);
+  }
+  
+  @SuppressWarnings("unchecked")
+  /**
+   * Non-Blocking API
+   */
+  public synchronized void removeApplication(ApplicationState appState) {
+    dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
+  }
+
+  /**
+   * Blocking API
+   * Derived classes must implement this method to remove the state of an 
+   * application and its attempts
+   */
+  protected abstract void removeApplicationState(ApplicationState appState) 
+                                                             throws Exception;
+  
+  // Dispatcher related code
+  
+  private synchronized void handleStoreEvent(RMStateStoreEvent event) {
+    switch(event.getType()) {
+      case STORE_APP_ATTEMPT:
+        {
+          ApplicationAttemptState attemptState = 
+                    ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
+          Exception storedException = null;
+          ApplicationAttemptStateDataPBImpl attemptStateData = 
+                                        new ApplicationAttemptStateDataPBImpl();
+          attemptStateData.setAttemptId(attemptState.getAttemptId());
+          attemptStateData.setMasterContainer(attemptState.getMasterContainer());
+
+          LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+          try {
+            storeApplicationAttemptState(attemptState.getAttemptId().toString(), 
+                                         attemptStateData);
+          } catch (Exception e) {
+            LOG.error("Error storing appAttempt: " 
+                      + attemptState.getAttemptId(), e);
+            storedException = e;
+          } finally {
+            notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), 
+                                                storedException);            
+          }
+        }
+        break;
+      case REMOVE_APP:
+        {
+          ApplicationState appState = 
+                          ((RMStateStoreRemoveAppEvent) event).getAppState();
+          ApplicationId appId = appState.getAppId();
+          
+          LOG.info("Removing info for app: " + appId);
+          try {
+            removeApplicationState(appState);
+          } catch (Exception e) {
+            LOG.error("Error removing app: " + appId, e);
+          }
+        }
+        break;
+      default:
+        LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  /**
+   * In (@link storeApplicationAttempt}, derived class can call this method to
+   * notify the application attempt about operation completion 
+   * @param appAttempt attempt that has been saved
+   */
+  private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
+                                                  Exception storedException) {
+    rmDispatcher.getEventHandler().handle(
+        new RMAppAttemptStoredEvent(attemptId, storedException));
+  }
+  
+  /**
+   * EventHandler implementation which forward events to the FSRMStateStore
+   * This hides the EventHandle methods of the store from its public interface 
+   */
+  private final class ForwardingEventHandler 
+                                  implements EventHandler<RMStateStoreEvent> {
+    
+    @Override
+    public void handle(RMStateStoreEvent event) {
+      handleStoreEvent(event);
+    }
     
   }
+
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java Fri Jan 11 19:40:23 2013
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Fri Jan 11 19:40:23 2013
@@ -44,6 +44,12 @@ public interface RMApp extends EventHand
    * @return the {@link ApplicationId} for this {@link RMApp}.
    */
   ApplicationId getApplicationId();
+  
+  /**
+   * The application submission context for this {@link RMApp}
+   * @return the {@link ApplicationSubmissionContext} for this {@link RMApp}
+   */
+  ApplicationSubmissionContext getApplicationSubmissionContext();
 
   /**
    * The current state of the {@link RMApp}.

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Jan 11 19:40:23 2013
@@ -50,6 +50,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -66,7 +69,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
-public class RMAppImpl implements RMApp {
+public class RMAppImpl implements RMApp, Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
   private static final String UNAVAILABLE = "N/A";
@@ -243,6 +246,11 @@ public class RMAppImpl implements RMApp 
   public ApplicationId getApplicationId() {
     return this.applicationId;
   }
+  
+  @Override
+  public ApplicationSubmissionContext getApplicationSubmissionContext() {
+    return this.submissionContext;
+  }
 
   @Override
   public FinalApplicationStatus getFinalApplicationStatus() {
@@ -512,9 +520,22 @@ public class RMAppImpl implements RMApp 
       this.writeLock.unlock();
     }
   }
+  
+  @Override
+  public void recover(RMState state) {
+    ApplicationState appState = state.getApplicationState().get(getApplicationId());
+    LOG.info("Recovering app: " + getApplicationId() + " with " + 
+            + appState.getAttemptCount() + " attempts");
+    for(int i=0; i<appState.getAttemptCount(); ++i) {
+      // create attempt
+      createNewAttempt(false);
+      // recover attempt
+      ((RMAppAttemptImpl) currentAttempt).recover(state);
+    }
+  }
 
   @SuppressWarnings("unchecked")
-  private void createNewAttempt() {
+  private void createNewAttempt(boolean startAttempt) {
     ApplicationAttemptId appAttemptId = Records
         .newRecord(ApplicationAttemptId.class);
     appAttemptId.setApplicationId(applicationId);
@@ -525,8 +546,10 @@ public class RMAppImpl implements RMApp 
         submissionContext, conf);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
-    handler.handle(
-        new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+    if(startAttempt) {
+      handler.handle(
+          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+    }
   }
   
   private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
@@ -553,7 +576,7 @@ public class RMAppImpl implements RMApp 
   
   private static final class StartAppAttemptTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.createNewAttempt();
+      app.createNewAttempt(true);
     };
   }
 
@@ -647,7 +670,7 @@ public class RMAppImpl implements RMApp 
         msg = "Unmanaged application " + app.getApplicationId()
             + " failed due to " + failedEvent.getDiagnostics()
             + ". Failing the application.";
-      } else if (app.attempts.size() == app.maxRetries) {
+      } else if (app.attempts.size() >= app.maxRetries) {
         retryApp = false;
         msg = "Application " + app.getApplicationId() + " failed "
             + app.maxRetries + " times due to " + failedEvent.getDiagnostics()
@@ -655,7 +678,7 @@ public class RMAppImpl implements RMApp 
       }
 
       if (retryApp) {
-        app.createNewAttempt();
+        app.createNewAttempt(true);
         return initialState;
       } else {
         LOG.info(msg);

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Fri Jan 11 19:40:23 2013
@@ -39,9 +39,15 @@ public enum RMAppAttemptEventType {
   CONTAINER_ACQUIRED,
   CONTAINER_ALLOCATED,
   CONTAINER_FINISHED,
+  
+  // Source: RMStateStore
+  ATTEMPT_SAVED,
 
   // Source: Scheduler
   APP_REJECTED,
   APP_ACCEPTED,
+  
+  // Source: RMAttemptImpl.recover
+  RECOVER
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jan 11 19:40:23 2013
@@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -57,6 +58,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -85,7 +92,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class RMAppAttemptImpl implements RMAppAttempt {
+public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
 
@@ -153,12 +160,15 @@ public class RMAppAttemptImpl implements
       .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.REGISTERED,
           new UnexpectedAMRegisteredTransition())
+      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED, 
+          RMAppAttemptEventType.RECOVER)
           
       // Transitions from SUBMITTED state
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
       .addTransition(RMAppAttemptState.SUBMITTED, 
-          EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
+          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
+                     RMAppAttemptState.SCHEDULED),
           RMAppAttemptEventType.APP_ACCEPTED, 
           new ScheduleTransition())
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
@@ -170,12 +180,42 @@ public class RMAppAttemptImpl implements
           
        // Transitions from SCHEDULED State
       .addTransition(RMAppAttemptState.SCHEDULED,
-          RMAppAttemptState.ALLOCATED,
+                     RMAppAttemptState.ALLOCATED_SAVING,
           RMAppAttemptEventType.CONTAINER_ALLOCATED,
           new AMContainerAllocatedTransition())
       .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
           RMAppAttemptEventType.KILL,
           new BaseFinalTransition(RMAppAttemptState.KILLED))
+          
+       // Transitions from ALLOCATED_SAVING State
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.ALLOCATED,
+          RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.ALLOCATED_SAVING,
+          RMAppAttemptEventType.CONTAINER_ACQUIRED, 
+          new ContainerAcquiredTransition())
+       // App could be killed by the client. So need to handle this. 
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.KILLED,
+          RMAppAttemptEventType.KILL,
+          new BaseFinalTransition(RMAppAttemptState.KILLED))
+      
+       // Transitions from LAUNCHED_UNMANAGED_SAVING State
+      .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
+          RMAppAttemptState.LAUNCHED,
+          RMAppAttemptEventType.ATTEMPT_SAVED, 
+          new UnmanagedAMAttemptSavedTransition())
+      // attempt should not try to register in this state
+      .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
+          RMAppAttemptState.FAILED,
+          RMAppAttemptEventType.REGISTERED,
+          new UnexpectedAMRegisteredTransition())
+      // App could be killed by the client. So need to handle this. 
+      .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
+          RMAppAttemptState.KILLED,
+          RMAppAttemptEventType.KILL,
+          new BaseFinalTransition(RMAppAttemptState.KILLED))
 
        // Transitions from ALLOCATED State
       .addTransition(RMAppAttemptState.ALLOCATED,
@@ -279,11 +319,30 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
+              RMAppAttemptEventType.ATTEMPT_SAVED,
+              RMAppAttemptEventType.CONTAINER_FINISHED,
+              RMAppAttemptEventType.UNREGISTERED,
+              RMAppAttemptEventType.KILL,
+              RMAppAttemptEventType.STATUS_UPDATE))
+              
+      // Transitions from RECOVERED State
+      .addTransition(
+          RMAppAttemptState.RECOVERED,
+          RMAppAttemptState.RECOVERED,
+          EnumSet.of(RMAppAttemptEventType.START,
+              RMAppAttemptEventType.APP_ACCEPTED,
+              RMAppAttemptEventType.APP_REJECTED,
+              RMAppAttemptEventType.EXPIRE,
+              RMAppAttemptEventType.LAUNCHED,
+              RMAppAttemptEventType.LAUNCH_FAILED,
+              RMAppAttemptEventType.REGISTERED,
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
+              RMAppAttemptEventType.CONTAINER_ACQUIRED,
+              RMAppAttemptEventType.ATTEMPT_SAVED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.STATUS_UPDATE))
-
     .installTopology();
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -318,7 +377,7 @@ public class RMAppAttemptImpl implements
   @Override
   public ApplicationSubmissionContext getSubmissionContext() {
     return this.submissionContext;
-  }
+  } 
 
   @Override
   public FinalApplicationStatus getFinalApplicationStatus() {
@@ -494,6 +553,10 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  private void setMasterContainer(Container container) {
+    masterContainer = container;
+  }
+
   @Override
   public void handle(RMAppAttemptEvent event) {
 
@@ -561,6 +624,21 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  @Override
+  public void recover(RMState state) {
+    ApplicationState appState = 
+        state.getApplicationState().get(getAppAttemptId().getApplicationId());
+    ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
+    assert attemptState != null;
+    setMasterContainer(attemptState.getMasterContainer());
+    LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() 
+             + " AttemptId: " + getAppAttemptId()
+             + " MasterContainer: " + masterContainer);
+    setDiagnostics("Attempt recovered after RM restart");
+    handle(new RMAppAttemptEvent(getAppAttemptId(), 
+                                 RMAppAttemptEventType.RECOVER));
+  }
+  
   private static class BaseTransition implements
       SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
 
@@ -625,13 +703,12 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-
-      // Send the acceptance to the app
-      appAttempt.eventHandler.handle(new RMAppEvent(event
-          .getApplicationAttemptId().getApplicationId(),
-          RMAppEventType.APP_ACCEPTED));
-
       if (!appAttempt.submissionContext.getUnmanagedAM()) {
+        // Send the acceptance to the app
+        appAttempt.eventHandler.handle(new RMAppEvent(event
+            .getApplicationAttemptId().getApplicationId(),
+            RMAppEventType.APP_ACCEPTED));
+
         // Request a container for the AM.
         ResourceRequest request = BuilderUtils.newResourceRequest(
             AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
@@ -647,35 +724,42 @@ public class RMAppAttemptImpl implements
         return RMAppAttemptState.SCHEDULED;
       } else {
         // RM not allocating container. AM is self launched. 
-        // Directly go to LAUNCHED state
-        // Register with AMLivelinessMonitor
-        appAttempt.rmContext.getAMLivelinessMonitor().register(
-            appAttempt.applicationAttemptId);
-        return RMAppAttemptState.LAUNCHED;
+        RMStateStore store = appAttempt.rmContext.getStateStore();
+        // save state and then go to LAUNCHED state
+        appAttempt.storeAttempt(store);
+        return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
       }
     }
   }
 
-  private static final class AMContainerAllocatedTransition extends BaseTransition {
+  private static final class AMContainerAllocatedTransition 
+                                                      extends BaseTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-
+                                                     RMAppAttemptEvent event) {
       // Acquire the AM container from the scheduler.
       Allocation amContainerAllocation = appAttempt.scheduler.allocate(
           appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
           EMPTY_CONTAINER_RELEASE_LIST);
 
       // Set the masterContainer
-      appAttempt.masterContainer = amContainerAllocation.getContainers().get(
-          0);
+      appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
+                                                                           0));
 
-      // Send event to launch the AM Container
-      appAttempt.eventHandler.handle(new AMLauncherEvent(
-          AMLauncherEventType.LAUNCH, appAttempt));
+      RMStateStore store = appAttempt.rmContext.getStateStore();
+      appAttempt.storeAttempt(store);
     }
   }
-
+  
+  private static final class AttemptStoredTransition extends BaseTransition {
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt,
+                                                    RMAppAttemptEvent event) {
+      appAttempt.checkAttemptStoreError(event);
+      appAttempt.launchAttempt();
+    }
+  }
+  
   private static class BaseFinalTransition extends BaseTransition {
 
     private final RMAppAttemptState finalAttemptState;
@@ -736,17 +820,34 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static final class AMLaunchedTransition extends BaseTransition {
+  private static class AMLaunchedTransition extends BaseTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-
+                            RMAppAttemptEvent event) {
       // Register with AMLivelinessMonitor
-      appAttempt.rmContext.getAMLivelinessMonitor().register(
-          appAttempt.applicationAttemptId);
-
+      appAttempt.attemptLaunched();
     }
   }
+  
+  private static final class UnmanagedAMAttemptSavedTransition 
+                                                extends AMLaunchedTransition {
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt,
+                            RMAppAttemptEvent event) {
+      appAttempt.checkAttemptStoreError(event);
+      // Send the acceptance to the app
+      // Ideally this should have been done when the scheduler accepted the app.
+      // But its here because until the attempt is saved the client should not
+      // launch the unmanaged AM. Client waits for the app status to be accepted
+      // before doing so. So we have to delay the accepted state until we have 
+      // completed storing the attempt
+      appAttempt.eventHandler.handle(new RMAppEvent(event
+          .getApplicationAttemptId().getApplicationId(),
+          RMAppEventType.APP_ACCEPTED));
+      
+      super.transition(appAttempt, event);
+    }    
+  }
 
   private static final class LaunchFailedTransition extends BaseFinalTransition {
 
@@ -1040,4 +1141,37 @@ public class RMAppAttemptImpl implements
       this.readLock.unlock();
     }
   }
+  
+  private void launchAttempt(){
+    // Send event to launch the AM Container
+    eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
+  }
+  
+  private void attemptLaunched() {
+    // Register with AMLivelinessMonitor
+    rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
+  }
+  
+  private void checkAttemptStoreError(RMAppAttemptEvent event) {
+    RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
+    if(storeEvent.getStoredException() != null)
+    {
+      // This needs to be handled for HA and give up master status if we got
+      // fenced
+      LOG.error("Failed to store attempt: " + getAppAttemptId(),
+                storeEvent.getStoredException());
+      ExitUtil.terminate(1, storeEvent.getStoredException());
+    }
+  }
+  
+  private void storeAttempt(RMStateStore store) {
+    // store attempt data in a non-blocking manner to prevent dispatcher
+    // thread starvation and wait for state to be saved
+    LOG.info("Storing attempt: AppId: " + 
+              getAppAttemptId().getApplicationId() 
+              + " AttemptId: " + 
+              getAppAttemptId()
+              + " MasterContainer: " + masterContainer);
+    store.storeApplicationAttempt(this);
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Fri Jan 11 19:40:23 2013
@@ -19,6 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 public enum RMAppAttemptState {
-  NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
-  FINISHING, FINISHED, KILLED,
+  NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, 
+  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Fri Jan 11 19:40:23 2013
@@ -199,6 +199,11 @@ public class RMContainerImpl implements 
   }
   
   @Override
+  public String toString() {
+    return containerId.toString();
+  }
+  
+  @Override
   public void handle(RMContainerEvent event) {
     LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
     try {
@@ -221,7 +226,7 @@ public class RMContainerImpl implements 
       writeLock.unlock();
     }
   }
-
+  
   private static class BaseTransition implements
       SingleArcTransition<RMContainerImpl, RMContainerEvent> {
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Jan 11 19:40:23 2013
@@ -177,7 +177,9 @@ public class FSLeafQueue extends FSQueue
 
       Collections.sort(appScheds, comparator);
       for (AppSchedulable sched: appScheds) {
-        return sched.assignContainer(node, reserved);
+        if (sched.getRunnable()) {
+          return sched.assignContainer(node, reserved);
+        }
       }
 
       return Resources.none();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jan 11 19:40:23 2013
@@ -514,7 +514,6 @@ public class FairScheduler implements Re
 
     queue.addApp(schedulerApp);
     queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
-    rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
 
     applications.put(applicationAttemptId, schedulerApp);
 
@@ -777,7 +776,8 @@ public class FairScheduler implements Re
         boolean assignedContainer = false;
         for (FSLeafQueue sched : scheds) {
           Resource assigned = sched.assignContainer(node, false);
-          if (Resources.greaterThan(assigned, Resources.none())) {
+          if (Resources.greaterThan(assigned, Resources.none()) ||
+              node.getReservedContainer() != null) {
             eventLog.log("ASSIGN", nm.getHostName(), assigned);
             assignedContainers++;
             assignedContainer = true;

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Jan 11 19:40:23 2013
@@ -227,6 +227,9 @@ public class QueueManager {
    * Return whether a queue exists already.
    */
   public boolean exists(String name) {
+    if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+      name = ROOT_QUEUE + "." + name;
+    }
     synchronized (queues) {
       return queues.containsKey(name);
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Fri Jan 11 19:40:23 2013
@@ -276,21 +276,26 @@ public class DelegationTokenRenewer exte
     Collection <Token<?>> tokens = ts.getAllTokens();
     long now = System.currentTimeMillis();
     
+    // find tokens for renewal, but don't add timers until we know
+    // all renewable tokens are valid
+    Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
     for(Token<?> token : tokens) {
       // first renew happens immediately
       if (token.isManaged()) {
         DelegationTokenToRenew dtr = 
           new DelegationTokenToRenew(applicationId, token, getConfig(), now, 
               shouldCancelAtEnd); 
-
-        addTokenToList(dtr);
-      
-        setTimerForTokenRenewal(dtr, true);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Registering token for renewal for:" +
-              " service = " + token.getService() + 
-              " for appId = " + applicationId);
-        }
+        renewToken(dtr);
+        dtrs.add(dtr);
+      }
+    }
+    for (DelegationTokenToRenew dtr : dtrs) {
+      addTokenToList(dtr);
+      setTimerForTokenRenewal(dtr);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registering token for renewal for:" +
+            " service = " + dtr.token.getService() +
+            " for appId = " + applicationId);
       }
     }
   }
@@ -301,54 +306,49 @@ public class DelegationTokenRenewer exte
    */
   private class RenewalTimerTask extends TimerTask {
     private DelegationTokenToRenew dttr;
+    private boolean cancelled = false;
     
     RenewalTimerTask(DelegationTokenToRenew t) {  
       dttr = t;  
     }
     
     @Override
-    public void run() {
+    public synchronized void run() {
+      if (cancelled) {
+        return;
+      }
+
       Token<?> token = dttr.token;
       try {
-        // need to use doAs so that http can find the kerberos tgt
-        dttr.expirationDate = UserGroupInformation.getLoginUser()
-          .doAs(new PrivilegedExceptionAction<Long>(){
-
-          @Override
-          public Long run() throws Exception {
-            return dttr.token.renew(dttr.conf);
-          }
-        });
-
+        renewToken(dttr);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Renewing delegation-token for:" + token.getService() + 
               "; new expiration;" + dttr.expirationDate);
         }
         
-        setTimerForTokenRenewal(dttr, false);// set the next one
+        setTimerForTokenRenewal(dttr);// set the next one
       } catch (Exception e) {
         LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
         removeFailedDelegationToken(dttr);
       }
     }
+
+    @Override
+    public synchronized boolean cancel() {
+      cancelled = true;
+      return super.cancel();
+    }
   }
   
   /**
    * set task to renew the token
    */
-  private 
-  void setTimerForTokenRenewal(DelegationTokenToRenew token, 
-                               boolean firstTime) throws IOException {
+  private void setTimerForTokenRenewal(DelegationTokenToRenew token)
+      throws IOException {
       
     // calculate timer time
-    long now = System.currentTimeMillis();
-    long renewIn;
-    if(firstTime) {
-      renewIn = now;
-    } else {
-      long expiresIn = (token.expirationDate - now); 
-      renewIn = now + expiresIn - expiresIn/10; // little bit before the expiration
-    }
+    long expiresIn = token.expirationDate - System.currentTimeMillis();
+    long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
     
     // need to create new task every time
     TimerTask tTask = new RenewalTimerTask(token);
@@ -357,6 +357,24 @@ public class DelegationTokenRenewer exte
     renewalTimer.schedule(token.timerTask, new Date(renewIn));
   }
 
+  // renew a token
+  private void renewToken(final DelegationTokenToRenew dttr)
+      throws IOException {
+    // need to use doAs so that http can find the kerberos tgt
+    // NOTE: token renewers should be responsible for the correct UGI!
+    try {
+      dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
+          new PrivilegedExceptionAction<Long>(){          
+            @Override
+            public Long run() throws Exception {
+              return dttr.token.renew(dttr.conf);
+            }
+          });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
   // cancel a token
   private void cancelToken(DelegationTokenToRenew t) {
     if(t.shouldCancelAtEnd) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Fri Jan 11 19:40:23 2013
@@ -84,12 +84,12 @@ class AppsBlock extends HtmlBlock {
       appsTableData.append("[\"<a href='")
       .append(url("app", appInfo.getAppId())).append("'>")
       .append(appInfo.getAppId()).append("</a>\",\"")
-      .append(StringEscapeUtils.escapeHtml(appInfo.getUser()))
-      .append("\",\"")
-      .append(StringEscapeUtils.escapeHtml(appInfo.getName()))
-      .append("\",\"")
-      .append(StringEscapeUtils.escapeHtml(appInfo.getQueue()))
-      .append("\",\"")
+      .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+        appInfo.getUser()))).append("\",\"")
+      .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+        appInfo.getName()))).append("\",\"")
+      .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+        appInfo.getQueue()))).append("\",\"")
       .append(appInfo.getStartTime()).append("\",\"")
       .append(appInfo.getFinishTime()).append("\",\"")
       .append(appInfo.getState()).append("\",\"")

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Fri Jan 11 19:40:23 2013
@@ -20,13 +20,14 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.apache.hadoop.yarn.util.StringHelper.join;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
 
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -36,7 +37,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
-import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
@@ -86,44 +86,52 @@ public class FairSchedulerAppsBlock exte
         reqAppStates.add(RMAppState.valueOf(stateString));
       }
     }
+    StringBuilder appsTableData = new StringBuilder("[\n");
     for (RMApp app : apps.values()) {
       if (reqAppStates != null && !reqAppStates.contains(app.getState())) {
         continue;
       }
       AppInfo appInfo = new AppInfo(app, true);
       String percent = String.format("%.1f", appInfo.getProgress());
-      String startTime = Times.format(appInfo.getStartTime());
-      String finishTime = Times.format(appInfo.getFinishTime());
       ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId();
       int fairShare = fsinfo.getAppFairShare(attemptId);
+      //AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js
+      appsTableData.append("[\"<a href='")
+      .append(url("app", appInfo.getAppId())).append("'>")
+      .append(appInfo.getAppId()).append("</a>\",\"")
+      .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+        appInfo.getUser()))).append("\",\"")
+      .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+        appInfo.getName()))).append("\",\"")
+      .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+        appInfo.getQueue()))).append("\",\"")
+      .append(fairShare).append("\",\"")
+      .append(appInfo.getStartTime()).append("\",\"")
+      .append(appInfo.getFinishTime()).append("\",\"")
+      .append(appInfo.getState()).append("\",\"")
+      .append(appInfo.getFinalStatus()).append("\",\"")
+      // Progress bar
+      .append("<br title='").append(percent)
+      .append("'> <div class='").append(C_PROGRESSBAR).append("' title='")
+      .append(join(percent, '%')).append("'> ").append("<div class='")
+      .append(C_PROGRESSBAR_VALUE).append("' style='")
+      .append(join("width:", percent, '%')).append("'> </div> </div>")
+      .append("\",\"<a href='");
+
+      String trackingURL =
+        !appInfo.isTrackingUrlReady()? "#" : appInfo.getTrackingUrlPretty();
+      
+      appsTableData.append(trackingURL).append("'>")
+      .append(appInfo.getTrackingUI()).append("</a>\"],\n");
 
-      tbody.
-        tr().
-          td().
-            br().$title(appInfo.getAppIdNum())._(). // for sorting
-            a(url("app", appInfo.getAppId()), appInfo.getAppId())._().
-          td(appInfo.getUser()).
-          td(appInfo.getName()).
-          td(appInfo.getQueue()).
-          td("" + fairShare).
-          td().
-            br().$title(String.valueOf(appInfo.getStartTime()))._().
-            _(startTime)._().
-          td().
-            br().$title(String.valueOf(appInfo.getFinishTime()))._().
-            _(finishTime)._().
-          td(appInfo.getState()).
-          td(appInfo.getFinalStatus()).
-          td().
-            br().$title(percent)._(). // for sorting
-            div(_PROGRESSBAR).
-              $title(join(percent, '%')). // tooltip
-              div(_PROGRESSBAR_VALUE).
-                $style(join("width:", percent, '%'))._()._()._().
-          td().
-            a(!appInfo.isTrackingUrlReady()?
-              "#" : appInfo.getTrackingUrlPretty(), appInfo.getTrackingUI())._()._();
     }
+    if(appsTableData.charAt(appsTableData.length() - 2) == ',') {
+      appsTableData.delete(appsTableData.length()-2, appsTableData.length()-1);
+    }
+    appsTableData.append("]");
+    html.script().$type("text/javascript").
+    _("var appsTableData=" + appsTableData)._();
+
     tbody._()._();
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Fri Jan 11 19:40:23 2013
@@ -20,16 +20,18 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.apache.hadoop.yarn.util.StringHelper.join;
 
-import java.util.List;
+import java.util.Collection;
 
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerLeafQueueInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerQueueInfo;
 import org.apache.hadoop.yarn.webapp.ResponseInfo;
 import org.apache.hadoop.yarn.webapp.SubView;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.LI;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.UL;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 import org.apache.hadoop.yarn.webapp.view.InfoBlock;
@@ -48,16 +50,15 @@ public class FairSchedulerPage extends R
   
   @RequestScoped
   static class FSQInfo {
-    FairSchedulerInfo fsinfo;
     FairSchedulerQueueInfo qinfo;
   }
   
-  static class QueueInfoBlock extends HtmlBlock {
-    final FairSchedulerQueueInfo qinfo;
+  static class LeafQueueBlock extends HtmlBlock {
+    final FairSchedulerLeafQueueInfo qinfo;
 
-    @Inject QueueInfoBlock(ViewContext ctx, FSQInfo info) {
+    @Inject LeafQueueBlock(ViewContext ctx, FSQInfo info) {
       super(ctx);
-      qinfo = (FairSchedulerQueueInfo) info.qinfo;
+      qinfo = (FairSchedulerLeafQueueInfo)info.qinfo;
     }
 
     @Override
@@ -81,6 +82,47 @@ public class FairSchedulerPage extends R
     }
   }
   
+  static class QueueBlock extends HtmlBlock {
+    final FSQInfo fsqinfo;
+
+    @Inject QueueBlock(FSQInfo info) {
+      fsqinfo = info;
+    }
+
+    @Override
+    public void render(Block html) {
+      Collection<FairSchedulerQueueInfo> subQueues = fsqinfo.qinfo.getChildQueues();
+      UL<Hamlet> ul = html.ul("#pq");
+      for (FairSchedulerQueueInfo info : subQueues) {
+        float capacity = info.getMaxResourcesFraction();
+        float fairShare = info.getFairShareFraction();
+        float used = info.getUsedFraction();
+        LI<UL<Hamlet>> li = ul.
+          li().
+            a(_Q).$style(width(capacity * Q_MAX_WIDTH)).
+              $title(join("Fair Share:", percent(fairShare))).
+              span().$style(join(Q_GIVEN, ";font-size:1px;", width(fairShare/capacity))).
+                _('.')._().
+              span().$style(join(width(used/capacity),
+                ";font-size:1px;left:0%;", used > fairShare ? Q_OVER : Q_UNDER)).
+                _('.')._().
+              span(".q", info.getQueueName())._().
+            span().$class("qstats").$style(left(Q_STATS_POS)).
+              _(join(percent(used), " used"))._();
+
+        fsqinfo.qinfo = info;
+        if (info instanceof FairSchedulerLeafQueueInfo) {
+          li.ul("#lq").li()._(LeafQueueBlock.class)._()._();
+        } else {
+          li._(QueueBlock.class);
+        }
+        li._();
+      }
+
+      ul._();
+    }
+  }
+  
   static class QueuesBlock extends HtmlBlock {
     final FairScheduler fs;
     final FSQInfo fsqinfo;
@@ -89,8 +131,9 @@ public class FairSchedulerPage extends R
       fs = (FairScheduler)rm.getResourceScheduler();
       fsqinfo = info;
     }
-    
-    @Override public void render(Block html) {
+
+    @Override
+    public void render(Block html) {
       html._(MetricsOverviewTable.class);
       UL<DIV<DIV<Hamlet>>> ul = html.
         div("#cs-wrapper.ui-widget").
@@ -106,8 +149,8 @@ public class FairSchedulerPage extends R
               span(".q", "default")._()._();
       } else {
         FairSchedulerInfo sinfo = new FairSchedulerInfo(fs);
-        fsqinfo.fsinfo = sinfo;
-        fsqinfo.qinfo = null;
+        fsqinfo.qinfo = sinfo.getRootQueueInfo();
+        float used = fsqinfo.qinfo.getUsedFraction();
 
         ul.
           li().$style("margin-bottom: 1em").
@@ -120,29 +163,15 @@ public class FairSchedulerPage extends R
               _("Used (over fair share)")._().
             span().$class("qlegend ui-corner-all ui-state-default").
               _("Max Capacity")._().
-          _();
-        
-        List<FairSchedulerQueueInfo> subQueues = fsqinfo.fsinfo.getQueueInfos();
-        for (FairSchedulerQueueInfo info : subQueues) {
-          fsqinfo.qinfo = info;
-          float capacity = info.getMaxResourcesFraction();
-          float fairShare = info.getFairShareFraction();
-          float used = info.getUsedFraction();
-          ul.
-              li().
-                a(_Q).$style(width(capacity * Q_MAX_WIDTH)).
-                  $title(join("Fair Share:", percent(fairShare))).
-                  span().$style(join(Q_GIVEN, ";font-size:1px;", width(fairShare/capacity))).
-                    _('.')._().
-                  span().$style(join(width(used/capacity),
-                    ";font-size:1px;left:0%;", used > fairShare ? Q_OVER : Q_UNDER)).
-                    _('.')._().
-                  span(".q", info.getQueueName())._().
-                span().$class("qstats").$style(left(Q_STATS_POS)).
-                  _(join(percent(used), " used"))._().
-                ul("#lq").li()._(QueueInfoBlock.class)._()._().
-              _();
-        }
+        _().
+          li().
+            a(_Q).$style(width(Q_MAX_WIDTH)).
+              span().$style(join(width(used), ";left:0%;",
+                  used > 1 ? Q_OVER : Q_UNDER))._(".")._().
+              span(".q", "root")._().
+            span().$class("qstats").$style(left(Q_STATS_POS)).
+              _(join(percent(used), " used"))._().
+            _(QueueBlock.class)._();
       }
       ul._()._().
       script().$type("text/javascript").
@@ -159,13 +188,16 @@ public class FairSchedulerPage extends R
           "#cs a { font-weight: normal; margin: 2px; position: relative }",
           "#cs a span { font-weight: normal; font-size: 80% }",
           "#cs-wrapper .ui-widget-header { padding: 0.2em 0.5em }",
+          ".qstats { font-weight: normal; font-size: 80%; position: absolute }",
+          ".qlegend { font-weight: normal; padding: 0 1em; margin: 1em }",
           "table.info tr th {width: 50%}")._(). // to center info table
       script("/static/jt/jquery.jstree.js").
       script().$type("text/javascript").
         _("$(function() {",
           "  $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
           "  $('#cs').bind('loaded.jstree', function (e, data) {",
-          "    data.inst.open_all(); }).",
+          "    data.inst.open_node('#pq', true);",
+          "   }).",
           "    jstree({",
           "    core: { animation: 188, html_titles: true },",
           "    plugins: ['themeroller', 'html_data', 'ui'],",
@@ -175,8 +207,9 @@ public class FairSchedulerPage extends R
           "  });",
           "  $('#cs').bind('select_node.jstree', function(e, data) {",
           "    var q = $('.q', data.rslt.obj).first().text();",
-            "    if (q == 'root') q = '';",
-          "    $('#apps').dataTable().fnFilter(q, 3);",
+          "    if (q == 'root') q = '';",
+          "    else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';",
+          "    $('#apps').dataTable().fnFilter(q, 3, true);",
           "  });",
           "  $('#cs').show();",
           "});")._();
@@ -197,4 +230,19 @@ public class FairSchedulerPage extends R
   static String left(float f) {
     return String.format("left:%.1f%%", f * 100);
   }
+  
+  @Override
+  protected String getAppsTableColumnDefs() {
+    StringBuilder sb = new StringBuilder();
+    return sb
+      .append("[\n")
+      .append("{'sType':'numeric', 'aTargets': [0]")
+      .append(", 'mRender': parseHadoopID }")
+
+      .append("\n, {'sType':'numeric', 'aTargets': [5, 6]")
+      .append(", 'mRender': renderHadoopDate }")
+
+      .append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [9]")
+      .append(", 'mRender': parseHadoopProgress }]").toString();
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java?rev=1432246&r1=1432245&r2=1432246&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java Fri Jan 11 19:40:23 2013
@@ -66,7 +66,17 @@ public class RmView extends TwoColumnLay
       .append(", bDeferRender: true")
       .append(", bProcessing: true")
 
-      .append("\n, aoColumnDefs: [\n")
+      .append("\n, aoColumnDefs: ")
+      .append(getAppsTableColumnDefs())
+
+      // Sort by id upon page load
+      .append(", aaSorting: [[0, 'desc']]}").toString();
+  }
+  
+  protected String getAppsTableColumnDefs() {
+    StringBuilder sb = new StringBuilder();
+    return sb
+      .append("[\n")
       .append("{'sType':'numeric', 'aTargets': [0]")
       .append(", 'mRender': parseHadoopID }")
 
@@ -74,9 +84,6 @@ public class RmView extends TwoColumnLay
       .append(", 'mRender': renderHadoopDate }")
 
       .append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [8]")
-      .append(", 'mRender': parseHadoopProgress }]")
-
-      // Sort by id upon page load
-      .append(", aaSorting: [[0, 'desc']]}").toString();
+      .append(", 'mRender': parseHadoopProgress }]").toString();
   }
 }