You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/10 21:09:16 UTC

svn commit: r1540535 [3/5] - in /hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/ha...

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Sun Nov 10 20:09:09 2013
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -40,7 +39,6 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -55,7 +53,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -82,11 +79,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -98,7 +94,6 @@ import org.apache.hadoop.yarn.state.Mult
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -142,7 +137,7 @@ public class RMAppAttemptImpl implements
   private float progress = 0;
   private String host = "N/A";
   private int rpcPort;
-  private String origTrackingUrl = "N/A";
+  private String originalTrackingUrl = "N/A";
   private String proxiedTrackingUrl = "N/A";
   private long startTime = 0;
 
@@ -157,6 +152,11 @@ public class RMAppAttemptImpl implements
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
 
+  private RMAppAttemptEvent eventCausingFinalSaving;
+  private RMAppAttemptState targetedFinalState;
+  private RMAppAttemptState recoveredFinalState;
+  private Object transitionTodo;
+
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
@@ -169,68 +169,80 @@ public class RMAppAttemptImpl implements
        // Transitions from NEW State
       .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
           RMAppAttemptEventType.START, new AttemptStartedTransition())
-      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED,
+      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
-      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.REGISTERED,
-          new UnexpectedAMRegisteredTransition())
-      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED, 
-          RMAppAttemptEventType.RECOVER)
+          new FinalSavingTransition(
+            new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
+      .addTransition( RMAppAttemptState.NEW,
+          EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
+            RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
+          RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
           
       // Transitions from SUBMITTED state
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
-          RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
+      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.APP_REJECTED,
+          new FinalSavingTransition(new AppRejectedTransition(),
+            RMAppAttemptState.FAILED))
       .addTransition(RMAppAttemptState.SUBMITTED, 
           EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                      RMAppAttemptState.SCHEDULED),
           RMAppAttemptEventType.APP_ACCEPTED, 
           new ScheduleTransition())
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
+      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.REGISTERED,
-          new UnexpectedAMRegisteredTransition())
+          new FinalSavingTransition(
+            new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
           
        // Transitions from SCHEDULED State
       .addTransition(RMAppAttemptState.SCHEDULED,
                      RMAppAttemptState.ALLOCATED_SAVING,
           RMAppAttemptEventType.CONTAINER_ALLOCATED,
           new AMContainerAllocatedTransition())
-      .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED,
+      .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
           
        // Transitions from ALLOCATED_SAVING State
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
           RMAppAttemptState.ALLOCATED,
-          RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition())
+          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
           RMAppAttemptState.ALLOCATED_SAVING,
           RMAppAttemptEventType.CONTAINER_ACQUIRED, 
           new ContainerAcquiredTransition())
        // App could be killed by the client. So need to handle this. 
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
-          RMAppAttemptState.KILLED,
+          RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
-      
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+
        // Transitions from LAUNCHED_UNMANAGED_SAVING State
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
           RMAppAttemptState.LAUNCHED,
-          RMAppAttemptEventType.ATTEMPT_SAVED, 
+          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, 
           new UnmanagedAMAttemptSavedTransition())
       // attempt should not try to register in this state
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
-          RMAppAttemptState.FAILED,
+          RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.REGISTERED,
-          new UnexpectedAMRegisteredTransition())
+          new FinalSavingTransition(
+            new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
       // App could be killed by the client. So need to handle this. 
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
-          RMAppAttemptState.KILLED,
+          RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new BaseFinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new BaseFinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
 
        // Transitions from ALLOCATED State
       .addTransition(RMAppAttemptState.ALLOCATED,
@@ -239,32 +251,40 @@ public class RMAppAttemptImpl implements
           new ContainerAcquiredTransition())
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
           RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
-      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
-          RMAppAttemptEventType.LAUNCH_FAILED, new LaunchFailedTransition())
-      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.KILLED,
-          RMAppAttemptEventType.KILL, new KillAllocatedAMTransition())
+      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.LAUNCH_FAILED,
+          new FinalSavingTransition(new LaunchFailedTransition(),
+            RMAppAttemptState.FAILED))
+      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.KILL,
+          new FinalSavingTransition(
+            new KillAllocatedAMTransition(), RMAppAttemptState.KILLED))
           
-      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FAILED,
+      .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new AMContainerCrashedTransition())
+          new FinalSavingTransition(
+            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
 
        // Transitions from LAUNCHED State
       .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
-      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
+      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new AMContainerCrashedTransition())
+          new FinalSavingTransition(
+            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
       .addTransition(
-          RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
+          RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
-          EXPIRED_TRANSITION)
-      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.KILLED,
+          new FinalSavingTransition(EXPIRED_TRANSITION,
+            RMAppAttemptState.FAILED))
+      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new FinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new FinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
 
        // Transitions from RUNNING State
       .addTransition(RMAppAttemptState.RUNNING,
-          EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
+          EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED),
           RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
@@ -276,17 +296,41 @@ public class RMAppAttemptImpl implements
                 new ContainerAcquiredTransition())
       .addTransition(
           RMAppAttemptState.RUNNING,
-          EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
+          EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
           new ContainerFinishedTransition())
       .addTransition(
-          RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED,
+          RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
-          EXPIRED_TRANSITION)
+          new FinalSavingTransition(EXPIRED_TRANSITION,
+            RMAppAttemptState.FAILED))
       .addTransition(
-          RMAppAttemptState.RUNNING, RMAppAttemptState.KILLED,
+          RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
-          new FinalTransition(RMAppAttemptState.KILLED))
+          new FinalSavingTransition(new FinalTransition(
+            RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+
+       // Transitions from FINAL_SAVING State
+      .addTransition(RMAppAttemptState.FINAL_SAVING,
+          EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FAILED,
+            RMAppAttemptState.KILLED, RMAppAttemptState.FINISHED),
+            RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED,
+            new FinalStateSavedTransition())
+      .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new ContainerFinishedAtFinalSavingTransition())
+      .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.EXPIRE,
+          new AMExpiredAtFinalSavingTransition())
+      .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
+          EnumSet.of(
+              RMAppAttemptEventType.UNREGISTERED,
+              RMAppAttemptEventType.STATUS_UPDATE,
+            // should be fixed to reject container allocate request at Final
+            // Saving in scheduler
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
+              RMAppAttemptEventType.CONTAINER_ACQUIRED,
+              RMAppAttemptEventType.KILL))
 
       // Transitions from FAILED State
       .addTransition(
@@ -338,7 +382,6 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.ATTEMPT_SAVED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
@@ -357,7 +400,7 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
               RMAppAttemptEventType.CONTAINER_ACQUIRED,
-              RMAppAttemptEventType.ATTEMPT_SAVED,
+              RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
               RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
@@ -411,7 +454,7 @@ public class RMAppAttemptImpl implements
   public RMAppAttemptState getAppAttemptState() {
     this.readLock.lock();
     try {
-      return this.stateMachine.getCurrentState();
+        return this.stateMachine.getCurrentState();
     } finally {
       this.readLock.unlock();
     }
@@ -444,7 +487,7 @@ public class RMAppAttemptImpl implements
     this.readLock.lock();
     try {
       return (getSubmissionContext().getUnmanagedAM()) ? 
-              this.origTrackingUrl : this.proxiedTrackingUrl;
+              this.originalTrackingUrl : this.proxiedTrackingUrl;
     } finally {
       this.readLock.unlock();
     }
@@ -454,7 +497,7 @@ public class RMAppAttemptImpl implements
   public String getOriginalTrackingUrl() {
     this.readLock.lock();
     try {
-      return this.origTrackingUrl;
+      return this.originalTrackingUrl;
     } finally {
       this.readLock.unlock();
     }    
@@ -490,10 +533,10 @@ public class RMAppAttemptImpl implements
   }
 
   private void setTrackingUrlToRMAppPage() {
-    origTrackingUrl = pjoin(
+    originalTrackingUrl = pjoin(
         WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
         "cluster", "app", getAppAttemptId().getApplicationId());
-    proxiedTrackingUrl = origTrackingUrl;
+    proxiedTrackingUrl = originalTrackingUrl;
   }
 
   // This is only used for RMStateStore. Normal operation must invoke the secret
@@ -539,16 +582,6 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  public void setDiagnostics(String message) {
-    this.writeLock.lock();
-
-    try {
-      this.diagnostics.append(message);
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
   @Override
   public float getProgress() {
     this.readLock.lock();
@@ -635,57 +668,34 @@ public class RMAppAttemptImpl implements
   @Override
   public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
     this.readLock.lock();
-    
     try {
-      int numUsedContainers = 0;
-      int numReservedContainers = 0;
-      Resource currentConsumption = Resources.createResource(0, 0);
-      Resource reservedResources = Resources.createResource(0, 0);
-      
-      SchedulerAppReport schedApp = 
-          scheduler.getSchedulerAppInfo(this.getAppAttemptId());
-      Collection<RMContainer> liveContainers;
-      Collection<RMContainer> reservedContainers;
-      if (schedApp != null) {
-        liveContainers = schedApp.getLiveContainers();
-        reservedContainers = schedApp.getReservedContainers();
-        if (liveContainers != null) {
-          numUsedContainers = liveContainers.size();
-          for (RMContainer lc : liveContainers) {
-            Resources.addTo(currentConsumption, lc.getContainer().getResource());
-          }
-        }
-        if (reservedContainers != null) {
-          numReservedContainers = reservedContainers.size();
-          for (RMContainer rc : reservedContainers) {
-            Resources.addTo(reservedResources, rc.getContainer().getResource());
-          }
-        }
-      }
-
-      return BuilderUtils.newApplicationResourceUsageReport(
-          numUsedContainers, numReservedContainers,
-          currentConsumption, reservedResources,
-          Resources.add(currentConsumption, reservedResources));
+      return scheduler.getAppResourceUsageReport(this.getAppAttemptId());
     } finally {
       this.readLock.unlock();
     }
   }
 
   @Override
-  public void recover(RMState state) throws Exception{
-    ApplicationState appState = 
+  public void recover(RMState state) throws Exception {
+    ApplicationState appState =
         state.getApplicationState().get(getAppAttemptId().getApplicationId());
-    ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
+    ApplicationAttemptState attemptState =
+        appState.getAttempt(getAppAttemptId());
     assert attemptState != null;
+    LOG.info("Recovered attempt: AppId: "
+        + getAppAttemptId().getApplicationId() + " AttemptId: "
+        + getAppAttemptId() + " MasterContainer: " + masterContainer);
+    diagnostics.append("Attempt recovered after RM restart");
+    diagnostics.append(attemptState.getDiagnostics());
     setMasterContainer(attemptState.getMasterContainer());
     recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
-    LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId()
-             + " AttemptId: " + getAppAttemptId()
-             + " MasterContainer: " + masterContainer);
-    setDiagnostics("Attempt recovered after RM restart");
-    handle(new RMAppAttemptEvent(getAppAttemptId(), 
-                                 RMAppAttemptEventType.RECOVER));
+    this.recoveredFinalState = attemptState.getState();
+    this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
+    this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
+    this.finalStatus = attemptState.getFinalApplicationStatus();
+    this.startTime = attemptState.getStartTime();
+    handle(new RMAppAttemptEvent(getAppAttemptId(),
+      RMAppAttemptEventType.RECOVER));
   }
 
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -763,7 +773,7 @@ public class RMAppAttemptImpl implements
       
       // Save the diagnostic message
       String message = rejectedEvent.getMessage();
-      appAttempt.setDiagnostics(message);
+      appAttempt.diagnostics.append(message);
 
       // Send the rejection event to app
       appAttempt.eventHandler.handle(
@@ -810,10 +820,8 @@ public class RMAppAttemptImpl implements
         }
         return RMAppAttemptState.SCHEDULED;
       } else {
-        // RM not allocating container. AM is self launched. 
-        RMStateStore store = appAttempt.rmContext.getStateStore();
         // save state and then go to LAUNCHED state
-        appAttempt.storeAttempt(store);
+        appAttempt.storeAttempt();
         return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
       }
     }
@@ -838,8 +846,7 @@ public class RMAppAttemptImpl implements
                                                                            0));
       appAttempt.getSubmissionContext().setResource(
           appAttempt.getMasterContainer().getResource());
-      RMStateStore store = appAttempt.rmContext.getStateStore();
-      appAttempt.storeAttempt(store);
+      appAttempt.storeAttempt();
     }
   }
   
@@ -851,6 +858,134 @@ public class RMAppAttemptImpl implements
       appAttempt.launchAttempt();
     }
   }
+
+  private static class AttemptRecoveredTransition
+      implements
+      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+    @Override
+    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+        RMAppAttemptEvent event) {
+      if (appAttempt.recoveredFinalState != null) {
+        appAttempt.progress = 1.0f;
+        return appAttempt.recoveredFinalState;
+      } else {
+        return RMAppAttemptState.RECOVERED;
+      }
+    }
+  }
+
+  private void rememberTargetTransitions(RMAppAttemptEvent event,
+      Object transitionToDo, RMAppAttemptState targetFinalState) {
+    transitionTodo = transitionToDo;
+    targetedFinalState = targetFinalState;
+    eventCausingFinalSaving = event;
+  }
+
+  private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
+      Object transitionToDo, RMAppAttemptState targetFinalState,
+      RMAppAttemptState stateToBeStored) {
+
+    rememberTargetTransitions(event, transitionToDo, targetFinalState);
+
+    // As of today, finalState, diagnostics, final-tracking-url and
+    // finalAppStatus are the only things that we store into the StateStore
+    // AFTER the initial saving on app-attempt-start
+    // These fields can be visible from outside only after they are saved in
+    // StateStore
+    String diags = null;
+    String finalTrackingUrl = null;
+    FinalApplicationStatus finalStatus = null;
+
+    switch (event.getType()) {
+    case APP_REJECTED:
+      RMAppAttemptRejectedEvent rejectedEvent =
+          (RMAppAttemptRejectedEvent) event;
+      diags = rejectedEvent.getMessage();
+      break;
+    case LAUNCH_FAILED:
+      RMAppAttemptLaunchFailedEvent launchFaileEvent =
+          (RMAppAttemptLaunchFailedEvent) event;
+      diags = launchFaileEvent.getMessage();
+      break;
+    case REGISTERED:
+      diags = getUnexpectedAMRegisteredDiagnostics();
+      break;
+    case UNREGISTERED:
+      RMAppAttemptUnregistrationEvent unregisterEvent =
+          (RMAppAttemptUnregistrationEvent) event;
+      diags = unregisterEvent.getDiagnostics();
+      finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
+      finalStatus = unregisterEvent.getFinalApplicationStatus();
+      break;
+    case CONTAINER_FINISHED:
+      RMAppAttemptContainerFinishedEvent finishEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      diags = getAMContainerCrashedDiagnostics(finishEvent);
+      break;
+    case KILL:
+      break;
+    case EXPIRE:
+      diags = getAMExpiredDiagnostics(event);
+      break;
+    default:
+      break;
+    }
+
+    RMStateStore rmStore = rmContext.getStateStore();
+    ApplicationAttemptState attemptState =
+        new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
+          rmStore.getCredentialsFromAppAttempt(this), startTime,
+          stateToBeStored, finalTrackingUrl, diags, finalStatus);
+    LOG.info("Updating application attempt " + applicationAttemptId
+        + " with final state: " + targetedFinalState);
+    rmStore.updateApplicationAttemptState(attemptState);
+  }
+
+  private static class FinalSavingTransition extends BaseTransition {
+
+    Object transitionToDo;
+    RMAppAttemptState targetedFinalState;
+
+    public FinalSavingTransition(Object transitionToDo,
+        RMAppAttemptState targetedFinalState) {
+      this.transitionToDo = transitionToDo;
+      this.targetedFinalState = targetedFinalState;
+    }
+
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      // For cases Killed/Failed, targetedFinalState is the same as the state to
+      // be stored
+      appAttempt.rememberTargetTransitionsAndStoreState(event, transitionToDo,
+        targetedFinalState, targetedFinalState);
+    }
+  }
+
+  private static class FinalStateSavedTransition implements
+      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+    @Override
+    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+        RMAppAttemptEvent event) {
+      RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
+      if (storeEvent.getUpdatedException() != null) {
+        LOG.error("Failed to update the final state of application attempt: "
+            + storeEvent.getApplicationAttemptId(),
+          storeEvent.getUpdatedException());
+        ExitUtil.terminate(1, storeEvent.getUpdatedException());
+      }
+
+      RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
+
+      if (appAttempt.transitionTodo instanceof SingleArcTransition) {
+        ((SingleArcTransition) appAttempt.transitionTodo).transition(
+          appAttempt, causeEvent);
+      } else if (appAttempt.transitionTodo instanceof MultipleArcTransition) {
+        ((MultipleArcTransition) appAttempt.transitionTodo).transition(
+          appAttempt, causeEvent);
+      }
+      return appAttempt.targetedFinalState;
+    }
+  }
   
   private static class BaseFinalTransition extends BaseTransition {
 
@@ -998,15 +1133,20 @@ public class RMAppAttemptImpl implements
           = (RMAppAttemptRegistrationEvent) event;
       appAttempt.host = registrationEvent.getHost();
       appAttempt.rpcPort = registrationEvent.getRpcport();
-      appAttempt.origTrackingUrl =
+      appAttempt.originalTrackingUrl =
           sanitizeTrackingUrl(registrationEvent.getTrackingurl());
       appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
+        appAttempt.generateProxyUriWithScheme(appAttempt.originalTrackingUrl);
 
       // Let the app know
       appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
           .getAppAttemptId().getApplicationId(),
           RMAppEventType.ATTEMPT_REGISTERED));
+
+      // TODO:FIXME: Note for future. Unfortunately we only do a state-store
+      // write at AM launch time, so we don't save the AM's tracking URL anywhere
+      // as that would mean an extra state-store write. For now, we hope that in
+      // work-preserving restart, AMs are forced to reregister.
     }
   }
 
@@ -1029,17 +1169,24 @@ public class RMAppAttemptImpl implements
           appAttempt.getAppAttemptId());
 
       // Setup diagnostic message
-      ContainerStatus status = finishEvent.getContainerStatus();
-      appAttempt.diagnostics.append("AM Container for " +
-          appAttempt.getAppAttemptId() + " exited with " +
-          " exitCode: " + status.getExitStatus() +
-          " due to: " +  status.getDiagnostics() + "." +
-          "Failing this attempt.");
+      appAttempt.diagnostics
+        .append(getAMContainerCrashedDiagnostics(finishEvent));
       // Tell the app, scheduler
       super.transition(appAttempt, finishEvent);
     }
   }
 
+  private static String getAMContainerCrashedDiagnostics(
+      RMAppAttemptContainerFinishedEvent finishEvent) {
+    ContainerStatus status = finishEvent.getContainerStatus();
+    String diagnostics =
+        "AM Container for " + finishEvent.getApplicationAttemptId()
+            + " exited with " + " exitCode: " + status.getExitStatus()
+            + " due to: " + status.getDiagnostics() + "."
+            + "Failing this attempt.";
+    return diagnostics;
+  }
+
   private static class FinalTransition extends BaseFinalTransition {
 
     public FinalTransition(RMAppAttemptState finalAttemptState) {
@@ -1055,7 +1202,8 @@ public class RMAppAttemptImpl implements
       // Tell the app and the scheduler
       super.transition(appAttempt, event);
 
-      // UnRegister from AMLivelinessMonitor
+      // UnRegister from AMLivelinessMonitor. Perhaps for
+      // FAILING/KILLED/UnManaged AMs
       appAttempt.rmContext.getAMLivelinessMonitor().unregister(
           appAttempt.getAppAttemptId());
       appAttempt.rmContext.getAMFinishingMonitor().unregister(
@@ -1078,12 +1226,18 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      appAttempt.diagnostics.append("ApplicationMaster for attempt " +
-        appAttempt.getAppAttemptId() + " timed out");
+      appAttempt.diagnostics.append(getAMExpiredDiagnostics(event));
       super.transition(appAttempt, event);
     }
   }
 
+  private static String getAMExpiredDiagnostics(RMAppAttemptEvent event) {
+    String diag =
+        "ApplicationMaster for attempt " + event.getApplicationAttemptId()
+            + " timed out";
+    return diag;
+  }
+
   private static class UnexpectedAMRegisteredTransition extends
       BaseFinalTransition {
 
@@ -1094,13 +1248,16 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
       assert appAttempt.submissionContext.getUnmanagedAM();
-      appAttempt
-          .setDiagnostics("Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
+      appAttempt.diagnostics.append(getUnexpectedAMRegisteredDiagnostics());
       super.transition(appAttempt, event);
     }
 
   }
 
+  private static String getUnexpectedAMRegisteredDiagnostics() {
+    return "Unmanaged AM must register after AM attempt reaches LAUNCHED state.";
+  }
+
   private static final class StatusUpdateTransition extends
       BaseTransition {
     @Override
@@ -1125,38 +1282,62 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
-
-      appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
-
-      appAttempt.progress = 1.0f;
-
-      RMAppAttemptUnregistrationEvent unregisterEvent
-        = (RMAppAttemptUnregistrationEvent) event;
-      appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
-      appAttempt.origTrackingUrl =
-          sanitizeTrackingUrl(unregisterEvent.getTrackingUrl());
-      appAttempt.proxiedTrackingUrl = 
-        appAttempt.generateProxyUriWithScheme(appAttempt.origTrackingUrl);
-      appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
-
       // Tell the app
       if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
         // Unmanaged AMs have no container to wait for, so they skip
         // the FINISHING state and go straight to FINISHED.
+        appAttempt.updateInfoOnAMUnregister(event);
         new FinalTransition(RMAppAttemptState.FINISHED).transition(
             appAttempt, event);
         return RMAppAttemptState.FINISHED;
       }
-      appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId);
+      // Saving the attempt final state
+      appAttempt.rememberTargetTransitionsAndStoreState(event,
+        new FinalStateSavedAfterAMUnregisterTransition(),
+        RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
       ApplicationId applicationId =
           appAttempt.getAppAttemptId().getApplicationId();
-      appAttempt.eventHandler.handle(
-          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
-      return RMAppAttemptState.FINISHING;
+
+      // Tell the app immediately that AM is unregistering so that app itself
+      // can save its state as soon as possible. Whether we do it like this, or
+      // we wait till AppAttempt is saved, it doesn't make any difference on the
+      // app side w.r.t failure conditions. The only event going out of
+      // AppAttempt to App after this point of time is AM/AppAttempt Finished.
+      appAttempt.eventHandler.handle(new RMAppEvent(applicationId,
+        RMAppEventType.ATTEMPT_UNREGISTERED));
+      return RMAppAttemptState.FINAL_SAVING;
     }
   }
 
+  private static class FinalStateSavedAfterAMUnregisterTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      // Unregister from the AMlivenessMonitor and register with AMFinishingMonitor
+      appAttempt.rmContext.getAMLivelinessMonitor().unregister(
+        appAttempt.applicationAttemptId);
+      appAttempt.rmContext.getAMFinishingMonitor().register(
+        appAttempt.applicationAttemptId);
+
+      // Do not make any more changes to this transition code. Make all changes
+      // to the following method. Unless you are absolutely sure that you have
+      // stuff to do that shouldn't be used by the callers of the following
+      // method.
+      appAttempt.updateInfoOnAMUnregister(event);
+    }
+  }
+
+  private void updateInfoOnAMUnregister(RMAppAttemptEvent event) {
+    progress = 1.0f;
+    RMAppAttemptUnregistrationEvent unregisterEvent =
+        (RMAppAttemptUnregistrationEvent) event;
+    diagnostics.append(unregisterEvent.getDiagnostics());
+    originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl());
+    proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
+    finalStatus = unregisterEvent.getFinalApplicationStatus();
+  }
+
   private static final class ContainerAcquiredTransition extends
       BaseTransition {
     @Override
@@ -1185,29 +1366,37 @@ public class RMAppAttemptImpl implements
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer != null
           && appAttempt.masterContainer.getId().equals(
-              containerStatus.getContainerId())) {
-        // container associated with AM. must not be unmanaged 
-        assert appAttempt.submissionContext.getUnmanagedAM() == false;
-        // Setup diagnostic message
-        appAttempt.diagnostics.append("AM Container for " +
-            appAttempt.getAppAttemptId() + " exited with " +
-            " exitCode: " + containerStatus.getExitStatus() +
-            " due to: " +  containerStatus.getDiagnostics() + "." +
-            "Failing this attempt.");
-
-        new FinalTransition(RMAppAttemptState.FAILED).transition(
-            appAttempt, containerFinishedEvent);
-        return RMAppAttemptState.FAILED;
+            containerStatus.getContainerId())) {
+        // Remember the follow up transition and save the final attempt state.
+        appAttempt.rememberTargetTransitionsAndStoreState(event,
+          new ContainerFinishedFinalStateSavedTransition(),
+          RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+        return RMAppAttemptState.FINAL_SAVING;
       }
 
-      // Normal container.
-
-      // Put it in completedcontainers list
+      // Normal container.Put it in completedcontainers list
       appAttempt.justFinishedContainers.add(containerStatus);
       return RMAppAttemptState.RUNNING;
     }
   }
 
+  private static class ContainerFinishedFinalStateSavedTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      // container associated with AM. must not be unmanaged
+      assert appAttempt.submissionContext.getUnmanagedAM() == false;
+      // Setup diagnostic message
+      appAttempt.diagnostics
+        .append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
+      new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
+        event);
+    }
+  }
+
   private static final class AMFinishingContainerFinishedTransition
       implements
       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@@ -1228,13 +1417,83 @@ public class RMAppAttemptImpl implements
             appAttempt, containerFinishedEvent);
         return RMAppAttemptState.FINISHED;
       }
-
       // Normal container.
       appAttempt.justFinishedContainers.add(containerStatus);
       return RMAppAttemptState.FINISHING;
     }
   }
 
+  private static class ContainerFinishedAtFinalSavingTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      ContainerStatus containerStatus =
+          containerFinishedEvent.getContainerStatus();
+
+      // If this is the AM container, it means the AM container is finished,
+      // but we are not yet acknowledged that the final state has been saved.
+      // Thus, we still return FINAL_SAVING state here.
+      if (appAttempt.masterContainer.getId().equals(
+        containerStatus.getContainerId())) {
+        if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
+            || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
+          // ignore Container_Finished Event if we were supposed to reach
+          // FAILED/KILLED state.
+          return;
+        }
+
+        // pass in the earlier AMUnregistered Event also, as this is needed for
+        // AMFinishedAfterFinalSavingTransition later on
+        appAttempt.rememberTargetTransitions(event,
+          new AMFinishedAfterFinalSavingTransition(
+            appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
+        return;
+      }
+      // Normal container.
+      appAttempt.justFinishedContainers.add(containerStatus);
+    }
+  }
+
+  private static class AMFinishedAfterFinalSavingTransition extends
+      BaseTransition {
+    RMAppAttemptEvent amUnregisteredEvent;
+    public AMFinishedAfterFinalSavingTransition(
+        RMAppAttemptEvent amUnregisteredEvent) {
+      this.amUnregisteredEvent = amUnregisteredEvent;
+    }
+
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
+      new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
+        event);
+    }
+  }
+
+  private static class AMExpiredAtFinalSavingTransition extends
+      BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
+          || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
+        // ignore Container_Finished Event if we were supposed to reach
+        // FAILED/KILLED state.
+        return;
+      }
+
+      // pass in the earlier AMUnregistered Event also, as this is needed for
+      // AMFinishedAfterFinalSavingTransition later on
+      appAttempt.rememberTargetTransitions(event,
+        new AMFinishedAfterFinalSavingTransition(
+        appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
+    }
+  }
+
   @Override
   public long getStartTime() {
     this.readLock.lock();
@@ -1256,7 +1515,7 @@ public class RMAppAttemptImpl implements
   }
   
   private void checkAttemptStoreError(RMAppAttemptEvent event) {
-    RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event;
+    RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
     if(storeEvent.getStoredException() != null)
     {
       // This needs to be handled for HA and give up master status if we got
@@ -1267,7 +1526,7 @@ public class RMAppAttemptImpl implements
     }
   }
   
-  private void storeAttempt(RMStateStore store) {
+  private void storeAttempt() {
     // store attempt data in a non-blocking manner to prevent dispatcher
     // thread starvation and wait for state to be saved
     LOG.info("Storing attempt: AppId: " + 
@@ -1275,7 +1534,7 @@ public class RMAppAttemptImpl implements
               + " AttemptId: " + 
               getAppAttemptId()
               + " MasterContainer: " + masterContainer);
-    store.storeApplicationAttempt(this);
+    rmContext.getStateStore().storeNewApplicationAttempt(this);
   }
 
   private void removeCredentials(RMAppAttemptImpl appAttempt) {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Sun Nov 10 20:09:09 2013
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.re
 
 public enum RMAppAttemptState {
   NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, 
-  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED
+  FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
+  FINAL_SAVING
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptUnregistrationEvent.java Sun Nov 10 20:09:09 2013
@@ -25,20 +25,20 @@ import org.apache.hadoop.yarn.server.res
 
 public class RMAppAttemptUnregistrationEvent extends RMAppAttemptEvent {
 
-  private final String trackingUrl;
+  private final String finalTrackingUrl;
   private final FinalApplicationStatus finalStatus;
   private final String diagnostics;
 
   public RMAppAttemptUnregistrationEvent(ApplicationAttemptId appAttemptId,
       String trackingUrl, FinalApplicationStatus finalStatus, String diagnostics) {
     super(appAttemptId, RMAppAttemptEventType.UNREGISTERED);
-    this.trackingUrl = trackingUrl;
+    this.finalTrackingUrl = trackingUrl;
     this.finalStatus = finalStatus;
     this.diagnostics = diagnostics;
   }
 
-  public String getTrackingUrl() {
-    return this.trackingUrl;
+  public String getFinalTrackingUrl() {
+    return this.finalTrackingUrl;
   }
 
   public FinalApplicationStatus getFinalApplicationStatus() {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Sun Nov 10 20:09:09 2013
@@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 
 /**
@@ -35,6 +37,9 @@ import org.apache.hadoop.yarn.server.api
  */
 public interface RMNode {
 
+  /** negative value means no timeout */
+  public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = -1;
+  
   /**
    * the node id of of this node.
    * @return the node id of this node.
@@ -94,7 +99,19 @@ public interface RMNode {
    * the total available resource.
    * @return the total available resource.
    */
-  public org.apache.hadoop.yarn.api.records.Resource getTotalCapability();
+  public Resource getTotalCapability();
+  
+  /**
+   * Set resource option with total available resource and overCommitTimoutMillis
+   * @param resourceOption
+   */
+  public void setResourceOption(ResourceOption resourceOption);
+  
+  /**
+   * resource option with total available resource and overCommitTimoutMillis
+   * @return ResourceOption
+   */
+  public ResourceOption getResourceOption();
   
   /**
    * The rack name for this node manager.

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Sun Nov 10 20:09:09 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -92,7 +93,7 @@ public class RMNodeImpl implements RMNod
   private final int httpPort;
   private final String nodeAddress; // The containerManager address
   private final String httpAddress;
-  private final Resource totalCapability;
+  private volatile ResourceOption resourceOption;
   private final Node node;
 
   private String healthReport;
@@ -173,13 +174,13 @@ public class RMNodeImpl implements RMNod
                              RMNodeEvent> stateMachine;
 
   public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
-      int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
+      int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) {
     this.nodeId = nodeId;
     this.context = context;
     this.hostName = hostName;
     this.commandPort = cmPort;
     this.httpPort = httpPort;
-    this.totalCapability = capability; 
+    this.resourceOption = resourceOption; 
     this.nodeAddress = hostName + ":" + cmPort;
     this.httpAddress = hostName + ":" + httpPort;
     this.node = node;
@@ -235,14 +236,24 @@ public class RMNodeImpl implements RMNod
 
   @Override
   public Resource getTotalCapability() {
-   return this.totalCapability;
+    return this.resourceOption.getResource();
+  }
+  
+  @Override
+  public void setResourceOption(ResourceOption resourceOption) {
+    this.resourceOption = resourceOption;
+  }
+  
+  @Override
+  public ResourceOption getResourceOption(){
+    return this.resourceOption;
   }
 
   @Override
   public String getRackName() {
     return node.getNetworkLocation();
   }
-
+  
   @Override
   public Node getNode() {
     return this.node;
@@ -438,7 +449,10 @@ public class RMNodeImpl implements RMNod
 
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeAddedSchedulerEvent(rmNode));
-      
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_USABLE, rmNode));
+ 
       String host = rmNode.nodeId.getHost();
       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
         // Old node rejoining
@@ -471,7 +485,7 @@ public class RMNodeImpl implements RMNod
           // Only add new node if old state is not UNHEALTHY
           rmNode.context.getDispatcher().getEventHandler().handle(
               new NodeAddedSchedulerEvent(rmNode));
-         }
+        }
       } else {
         // Reconnected node differs, so replace old node and start new node
         switch (rmNode.getState()) {
@@ -486,6 +500,9 @@ public class RMNodeImpl implements RMNod
         rmNode.context.getDispatcher().getEventHandler().handle(
             new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
       }
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_USABLE, rmNode));
     }
   }
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Sun Nov 10 20:09:09 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -397,5 +398,12 @@ public abstract class SchedulerApplicati
     lastScheduledContainer.put(priority, currentTimeMs);
     schedulingOpportunities.setCount(priority, 0);
   }
+  
+  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
+    return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
+        reservedContainers.size(), Resources.clone(currentConsumption),
+        Resources.clone(currentReservation),
+        Resources.add(currentConsumption, currentReservation));
+  }
 
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Sun Nov 10 20:09:09 2013
@@ -67,6 +67,12 @@ public abstract class SchedulerNode {
    * @return number of active containers on the node
    */
   public abstract int getNumContainers();
+  
+  /**
+   * Apply delta resource on node's available resource.
+   * @param deltaResource the delta of resource need to apply to node
+   */
+  public abstract void applyDeltaOnAvailableResource(Resource deltaResource);
 
   /**
    * Get total resources on the node.

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Sun Nov 10 20:09:09 2013
@@ -19,20 +19,19 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.List;
 
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -147,6 +146,37 @@ public class SchedulerUtils {
         maximumResource, minimumResource);
     ask.setCapability(normalized);
   }
+  
+  /**
+   * Update resource in SchedulerNode if any resource change in RMNode.
+   * @param node SchedulerNode with old resource view
+   * @param rmNode RMNode with new resource view
+   * @param clusterResource the cluster's resource that need to update
+   * @param log Scheduler's log for resource change
+   */
+  public static void updateResourceIfChanged(SchedulerNode node, 
+      RMNode rmNode, Resource clusterResource, Log log) {
+    Resource oldAvailableResource = node.getAvailableResource();
+    Resource newAvailableResource = Resources.subtract(
+        rmNode.getTotalCapability(), node.getUsedResource());
+    
+    if (!newAvailableResource.equals(oldAvailableResource)) {
+      Resource deltaResource = Resources.subtract(newAvailableResource,
+          oldAvailableResource);
+      // Reflect resource change to scheduler node.
+      node.applyDeltaOnAvailableResource(deltaResource);
+      // Reflect resource change to clusterResource.
+      Resources.addTo(clusterResource, deltaResource);
+      // TODO process resource over-commitment case (allocated containers
+      // > total capacity) in different option by getting value of
+      // overCommitTimeoutMillis.
+      
+      // Log resource change
+      log.info("Resource change on node: " + rmNode.getNodeAddress() 
+          + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
+          + deltaResource.getMemory() +"MB");
+    }
+  }
 
   /**
    * Utility method to normalize a list of resource requests, by insuring that

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Sun Nov 10 20:09:09 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -130,6 +131,16 @@ public interface YarnScheduler extends E
   SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId);
 
   /**
+   * Get a resource usage report from a given app attempt ID.
+   * @param appAttemptId the id of the application attempt
+   * @return resource usage report for this given attempt
+   */
+  @LimitedPrivate("yarn")
+  @Evolving
+  ApplicationResourceUsageReport getAppResourceUsageReport(
+      ApplicationAttemptId appAttemptId);
+  
+  /**
    * Get the root queue for the scheduler.
    * @return the root queue for the scheduler.
    */

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sun Nov 10 20:09:09 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -628,6 +629,10 @@ public class CapacityScheduler
     }
 
     FiCaSchedulerNode node = getNode(nm.getNodeID());
+    
+    // Update resource if any change
+    SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+    
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -694,7 +699,7 @@ public class CapacityScheduler
           node.getReservedContainer().getContainerId().getApplicationAttemptId()
           );
     }
-
+  
   }
 
   private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
@@ -856,6 +861,13 @@ public class CapacityScheduler
     return app == null ? null : new SchedulerAppReport(app);
   }
   
+  @Override
+  public ApplicationResourceUsageReport getAppResourceUsageReport(
+      ApplicationAttemptId applicationAttemptId) {
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    return app == null ? null : app.getResourceUsageReport();
+  }
+  
   @Lock(Lock.NoLock.class)
   FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Sun Nov 10 20:09:09 2013
@@ -268,4 +268,10 @@ public class FiCaSchedulerNode extends S
     return reservedContainer;
   }
 
+  @Override
+  public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
+    // we can only adjust available resource if total resource is changed.
+    Resources.addTo(this.availableResource, deltaResource);
+  }
+
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Sun Nov 10 20:09:09 2013
@@ -269,4 +269,11 @@ public class FSSchedulerNode extends Sch
   public synchronized AppSchedulable getReservedAppSchedulable() {
     return reservedAppSchedulable;
   }
+  
+  @Override
+  public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
+    // we can only adjust available resource if total resource is changed.
+    Resources.addTo(this.availableResource, deltaResource);
+  }
+  
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Sun Nov 10 20:09:09 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -181,6 +182,8 @@ public class FairScheduler implements Re
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
   protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
+  private Comparator nodeAvailableResourceComparator =
+          new NodeAvailableResourceComparator(); // Node available resource comparator
   protected double nodeLocalityThreshold; // Cluster threshold for node locality
   protected double rackLocalityThreshold; // Cluster threshold for rack locality
   protected long nodeLocalityDelayMs; // Delay for node locality
@@ -917,6 +920,9 @@ public class FairScheduler implements Re
     eventLog.log("HEARTBEAT", nm.getHostName());
     FSSchedulerNode node = nodes.get(nm.getNodeID());
 
+    // Update resource if any change
+    SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG);
+    
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -948,14 +954,22 @@ public class FairScheduler implements Re
 
   private void continuousScheduling() {
     while (true) {
-      for (FSSchedulerNode node : nodes.values()) {
-        try {
-          if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
-            attemptScheduling(node);
+      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+
+      // iterate all nodes
+      for (NodeId nodeId : nodeIdList) {
+        if (nodes.containsKey(nodeId)) {
+          FSSchedulerNode node = nodes.get(nodeId);
+          try {
+            if (Resources.fitsIn(minimumAllocation,
+                    node.getAvailableResource())) {
+              attemptScheduling(node);
+            }
+          } catch (Throwable ex) {
+            LOG.warn("Error while attempting scheduling for node " + node +
+                    ": " + ex.toString(), ex);
           }
-        } catch (Throwable ex) {
-          LOG.warn("Error while attempting scheduling for node " + node + ": " +
-                  ex.toString(), ex);
         }
       }
       try {
@@ -966,6 +980,17 @@ public class FairScheduler implements Re
       }
     }
   }
+
+  /** Sort nodes by available resource */
+  private class NodeAvailableResourceComparator implements Comparator<NodeId> {
+
+    @Override
+    public int compare(NodeId n1, NodeId n2) {
+      return RESOURCE_CALCULATOR.compare(clusterCapacity,
+              nodes.get(n2).getAvailableResource(),
+              nodes.get(n1).getAvailableResource());
+    }
+  }
   
   private synchronized void attemptScheduling(FSSchedulerNode node) {
     // Assign new containers...
@@ -1030,6 +1055,17 @@ public class FairScheduler implements Re
     return new SchedulerAppReport(applications.get(appAttemptId));
   }
   
+  @Override
+  public ApplicationResourceUsageReport getAppResourceUsageReport(
+      ApplicationAttemptId appAttemptId) {
+    FSSchedulerApp app = applications.get(appAttemptId);
+    if (app == null) {
+      LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
+      return null;
+    }
+    return app.getResourceUsageReport();
+  }
+  
   /**
    * Subqueue metrics might be a little out of date because fair shares are
    * recalculated at the update interval, but the root queue metrics needs to

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Sun Nov 10 20:09:09 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -99,7 +100,7 @@ public class FifoScheduler implements Re
   private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
   private RMContext rmContext;
 
-  private Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
+  protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
 
   private boolean initialized;
   private Resource minimumAllocation;
@@ -327,6 +328,13 @@ public class FifoScheduler implements Re
     return app == null ? null : new SchedulerAppReport(app);
   }
   
+  @Override
+  public ApplicationResourceUsageReport getAppResourceUsageReport(
+      ApplicationAttemptId applicationAttemptId) {
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    return app == null ? null : app.getResourceUsageReport();
+  }
+  
   private FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
@@ -620,6 +628,9 @@ public class FifoScheduler implements Re
   private synchronized void nodeUpdate(RMNode rmNode) {
     FiCaSchedulerNode node = getNode(rmNode.getNodeID());
     
+    // Update resource if any change
+    SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG);
+    
     List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -653,7 +664,7 @@ public class FifoScheduler implements Re
     
     metrics.setAvailableResourcesToQueue(
         Resources.subtract(clusterResource, usedResource));
-  }  
+  }
 
   @Override
   public void handle(SchedulerEvent event) {

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Sun Nov 10 20:09:09 2013
@@ -77,7 +77,8 @@ public class FairSchedulerPage extends R
       if (maxApps < Integer.MAX_VALUE) {
           ri._("Max Running Applications:", qinfo.getMaxApplications());
       }
-      ri._("Fair Share:", qinfo.getFairShare().toString());
+      ri._("Fair Share:", StringEscapeUtils.escapeHtml(
+        qinfo.getFairShare().toString()));
 
       html._(InfoBlock.class);
 

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java Sun Nov 10 20:09:09 2013
@@ -26,8 +26,10 @@ import javax.xml.bind.annotation.XmlTran
 
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -71,6 +73,9 @@ public class AppInfo {
   protected long elapsedTime;
   protected String amContainerLogs;
   protected String amHostHttpAddress;
+  protected int allocatedMB;
+  protected int allocatedVCores;
+  protected int runningContainers;
 
   public AppInfo() {
   } // JAXB needs this
@@ -132,6 +137,15 @@ public class AppInfo {
             this.amContainerLogs = url;
             this.amHostHttpAddress = masterContainer.getNodeHttpAddress();
           }
+          
+          ApplicationResourceUsageReport resourceReport = attempt
+              .getApplicationResourceUsageReport();
+          if (resourceReport != null) {
+            Resource usedResources = resourceReport.getUsedResources();
+            allocatedMB = usedResources.getMemory();
+            allocatedVCores = usedResources.getVirtualCores();
+            runningContainers = resourceReport.getNumUsedContainers();
+          }
         }
       }
     }
@@ -224,5 +238,17 @@ public class AppInfo {
   public String getApplicationType() {
     return this.applicationType;
   }
-
+  
+  public int getRunningContainers() {
+    return this.runningContainers;
+  }
+  
+  public int getAllocatedMB() {
+    return this.allocatedMB;
+  }
+  
+  public int getAllocatedVCores() {
+    return this.allocatedVCores;
+  }
+  
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Sun Nov 10 20:09:09 2013
@@ -202,6 +202,12 @@ public class MockAM {
     final FinishApplicationMasterRequest req =
         FinishApplicationMasterRequest.newInstance(
           FinalApplicationStatus.SUCCEEDED, "", "");
+    unregisterAppAttempt(req);
+  }
+
+  public void unregisterAppAttempt(final FinishApplicationMasterRequest req)
+      throws Exception {
+    waitForState(RMAppAttemptState.RUNNING);
     UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser(attemptId.toString());
     Token<AMRMTokenIdentifier> token =
@@ -216,4 +222,8 @@ public class MockAM {
       }
     });
   }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return this.attemptId;
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Sun Nov 10 20:09:09 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -93,14 +94,14 @@ public class MockNodes {
     private String nodeAddr;
     private String httpAddress;
     private int cmdPort;
-    private Resource perNode;
+    private ResourceOption perNode;
     private String rackName;
     private String healthReport;
     private long lastHealthReportTime;
     private NodeState state;
 
     public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
-        Resource perNode, String rackName, String healthReport,
+        ResourceOption perNode, String rackName, String healthReport,
         long lastHealthReportTime, int cmdPort, String hostName, NodeState state) {
       this.nodeId = nodeId;
       this.nodeAddr = nodeAddr;
@@ -146,7 +147,7 @@ public class MockNodes {
 
     @Override
     public Resource getTotalCapability() {
-      return this.perNode;
+      return this.perNode.getResource();
     }
 
     @Override
@@ -202,6 +203,17 @@ public class MockNodes {
     public long getLastHealthReportTime() {
       return lastHealthReportTime;
     }
+
+    @Override
+    public void setResourceOption(ResourceOption resourceOption) {
+      this.perNode = resourceOption;
+    }
+    
+    @Override
+    public ResourceOption getResourceOption(){
+      return this.perNode;
+    }
+    
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
@@ -220,8 +232,9 @@ public class MockNodes {
 
     final String httpAddress = httpAddr;
     String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
-    return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName,
-        healthReport, 0, nid, hostName, state); 
+    return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress,
+        ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
+        rackName, healthReport, 0, nid, hostName, state); 
   }
 
   public static RMNode nodeInfo(int rack, final Resource perNode,

Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1540535&r1=1540534&r2=1540535&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Sun Nov 10 20:09:09 2013
@@ -163,6 +163,14 @@ public class MockRM extends ResourceMana
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType) throws Exception {
+    return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+      maxAppAttempts, ts, appType, true);
+  }
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+      int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -222,7 +230,9 @@ public class MockRM extends ResourceMana
     }.setClientReq(client, req);
     fakeUser.doAs(action);
     // make sure app is immediately available after submit
-    waitForState(appId, RMAppState.ACCEPTED);
+    if (waitForAccepted) {
+      waitForState(appId, RMAppState.ACCEPTED);
+    }
     return getRMContext().getRMApps().get(appId);
   }