You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [12/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -64,6 +66,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -115,6 +118,7 @@ public class RMAppImpl implements RMApp,
   private EventHandler handler;
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+  private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
 
   // These states stored are only valid when app is at killing or final_saving.
   private RMAppState stateBeforeKilling;
@@ -162,6 +166,8 @@ public class RMAppImpl implements RMApp,
         RMAppEventType.APP_REJECTED,
           new FinalSavingTransition(new AppRejectedTransition(),
             RMAppState.FAILED))
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+        RMAppEventType.MOVE, new RMAppMoveTransition())
 
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
@@ -179,7 +185,6 @@ public class RMAppImpl implements RMApp,
         new FinalSavingTransition(
           new AppKilledTransition(), RMAppState.KILLED))
 
-
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@@ -199,12 +204,9 @@ public class RMAppImpl implements RMApp,
         new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
     .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
         RMAppEventType.KILL, new KillAttemptTransition())
-    // ACCECPTED state can once again receive APP_ACCEPTED event, because on
-    // recovery the app returns ACCEPTED state and the app once again go
-    // through the scheduler and triggers one more APP_ACCEPTED event at
-    // ACCEPTED state.
-    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
-        RMAppEventType.APP_ACCEPTED)
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -219,6 +221,9 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
       // UnManagedAM directly jumps to finished
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.RUNNING,
         EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
@@ -234,22 +239,31 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_FINISHED,
         new AttemptFinishedAtFinalSavingTransition())
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
-          RMAppEventType.APP_NEW_SAVED))
+          RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
 
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
-        // ignore Kill as we have already saved the final Finished state in
-        // state store.
-        RMAppEventType.KILL))
+        // ignore Kill/Move as we have already saved the final Finished state
+        // in state store.
+        RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from KILLING state
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
@@ -262,42 +276,47 @@ public class RMAppImpl implements RMApp,
             RMAppEventType.ATTEMPT_FINISHED,
             RMAppEventType.ATTEMPT_FAILED,
             RMAppEventType.APP_UPDATE_SAVED,
-            RMAppEventType.KILL))
+            RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from FINISHED state
      // ignorable transitions
+    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
         EnumSet.of(
             RMAppEventType.NODE_UPDATE,
             RMAppEventType.ATTEMPT_UNREGISTERED,
             RMAppEventType.ATTEMPT_FINISHED,
-            RMAppEventType.KILL))
+            RMAppEventType.KILL, RMAppEventType.MOVE))
 
      // Transitions from FAILED state
      // ignorable transitions
+    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+            RMAppEventType.MOVE))
 
      // Transitions from KILLED state
      // ignorable transitions
+    .addTransition(RMAppState.KILLED, RMAppState.KILLED, 
+        RMAppEventType.APP_RUNNING_ON_NODE,
+        new AppRunningOnNodeTransition())
     .addTransition(
         RMAppState.KILLED,
         RMAppState.KILLED,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.NODE_UPDATE))
+            RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
 
      .installTopology();
 
   private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
                                                                  stateMachine;
 
-  private static final ApplicationResourceUsageReport
-    DUMMY_APPLICATION_RESOURCE_USAGE_REPORT =
-      BuilderUtils.newApplicationResourceUsageReport(-1, -1,
-          Resources.createResource(-1, -1), Resources.createResource(-1, -1),
-          Resources.createResource(-1, -1));
   private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
   
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
@@ -498,7 +517,7 @@ public class RMAppImpl implements RMApp,
       String origTrackingUrl = UNAVAILABLE;
       int rpcPort = -1;
       ApplicationResourceUsageReport appUsageReport =
-          DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+          RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
       FinalApplicationStatus finishState = getFinalApplicationStatus();
       String diags = UNAVAILABLE;
       float progress = 0.0f;
@@ -666,7 +685,12 @@ public class RMAppImpl implements RMApp,
         ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf, maxAppAttempts == attempts.size());
+          submissionContext, conf,
+          // The newly created attempt maybe last attempt if (number of
+          // previously failed attempts(which should not include Preempted,
+          // hardware error and NM resync) + 1) equal to the max-attempt
+          // limit.
+          maxAppAttempts == (getNumFailedAppAttempts() + 1));
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
   }
@@ -699,6 +723,23 @@ public class RMAppImpl implements RMApp,
           nodeUpdateEvent.getNode());
     };
   }
+  
+  private static final class AppRunningOnNodeTransition extends RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
+      
+      // if final state already stored, notify RMNode
+      if (isAppInFinalState(app)) {
+        app.handler.handle(
+            new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
+                .getApplicationId()));
+        return;
+      }
+      
+      // otherwise, add it to ranNodes for further process
+      app.ranNodes.add(nodeAddedEvent.getNodeId());
+    };
+  }
 
   /**
    * Move an app to a new queue.
@@ -723,46 +764,54 @@ public class RMAppImpl implements RMApp,
     }
   }
 
+  // synchronously recover attempt to ensure any incoming external events
+  // to be processed after the attempt processes the recover event.
+  private void recoverAppAttempts() {
+    for (RMAppAttempt attempt : getAppAttempts().values()) {
+      attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
+        RMAppAttemptEventType.RECOVER));
+    }
+  }
+
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
-      for (RMAppAttempt attempt : app.getAppAttempts().values()) {
-        // synchronously recover attempt to ensure any incoming external events
-        // to be processed after the attempt processes the recover event.
-        attempt.handle(
-          new RMAppAttemptEvent(attempt.getAppAttemptId(),
-            RMAppAttemptEventType.RECOVER));
-      }
-
       // The app has completed.
       if (app.recoveredFinalState != null) {
+        app.recoverAppAttempts();
         new FinalTransition(app.recoveredFinalState).transition(app, event);
         return app.recoveredFinalState;
       }
 
-      // Last attempt is in final state, do not add to scheduler and just return
-      // ACCEPTED waiting for last RMAppAttempt to send finished or failed event
-      // back.
+      // No existent attempts means the attempt associated with this app was not
+      // started or started but not yet saved.
+      if (app.attempts.isEmpty()) {
+        app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+          app.submissionContext.getQueue(), app.user));
+        return RMAppState.SUBMITTED;
+      }
+
+      // Add application to scheduler synchronously to guarantee scheduler
+      // knows applications before AM or NM re-registers.
+      app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+        app.submissionContext.getQueue(), app.user, true));
+
+      // recover attempts
+      app.recoverAppAttempts();
+
+      // Last attempt is in final state, return ACCEPTED waiting for last
+      // RMAppAttempt to send finished or failed event back.
       if (app.currentAttempt != null
           && (app.currentAttempt.getState() == RMAppAttemptState.KILLED
               || app.currentAttempt.getState() == RMAppAttemptState.FINISHED
               || (app.currentAttempt.getState() == RMAppAttemptState.FAILED
-                  && app.attempts.size() == app.maxAppAttempts))) {
+                  && app.getNumFailedAppAttempts() == app.maxAppAttempts))) {
         return RMAppState.ACCEPTED;
       }
 
-      // Notify scheduler about the app on recovery
-      new AddApplicationToSchedulerTransition().transition(app, event);
-
-      // No existent attempts means the attempt associated with this app was not
-      // started or started but not yet saved.
-      if (app.attempts.isEmpty()) {
-        return RMAppState.SUBMITTED;
-      }
-
       // YARN-1507 is saving the application state after the application is
       // accepted. So after YARN-1507, an app is saved meaning it is accepted.
       // Thus we return ACCECPTED state on recovery.
@@ -774,17 +823,6 @@ public class RMAppImpl implements RMApp,
       RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      if (event instanceof RMAppNewSavedEvent) {
-        RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
-        // For HA this exception needs to be handled by giving up
-        // master status if we got fenced
-        if (((RMAppNewSavedEvent) event).getStoredException() != null) {
-          LOG.error(
-            "Failed to store application: " + storeEvent.getApplicationId(),
-            storeEvent.getStoredException());
-          ExitUtil.terminate(1, storeEvent.getStoredException());
-        }
-      }
       app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
         app.submissionContext.getQueue(), app.user));
     }
@@ -802,13 +840,6 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-      RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application"
-              + storeEvent.getApplicationId(), storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       if (app.transitionTodo instanceof SingleArcTransition) {
         ((SingleArcTransition) app.transitionTodo).transition(app,
           app.eventCausingFinalSaving);
@@ -844,7 +875,7 @@ public class RMAppImpl implements RMApp,
       msg = "Unmanaged application " + this.getApplicationId()
               + " failed due to " + failedEvent.getDiagnostics()
               + ". Failing the application.";
-    } else if (this.attempts.size() >= this.maxAppAttempts) {
+    } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
       msg = "Application " + this.getApplicationId() + " failed "
               + this.maxAppAttempts + " times due to "
               + failedEvent.getDiagnostics() + ". Failing the application.";
@@ -1037,17 +1068,8 @@ public class RMAppImpl implements RMApp,
       this.finalState = finalState;
     }
 
-    private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
-      Set<NodeId> nodes = new HashSet<NodeId>();
-      for (RMAppAttempt attempt : app.attempts.values()) {
-        nodes.addAll(attempt.getRanNodes());
-      }
-      return nodes;
-    }
-
     public void transition(RMAppImpl app, RMAppEvent event) {
-      Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
-      for (NodeId nodeId : nodes) {
+      for (NodeId nodeId : app.getRanNodes()) {
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
       }
@@ -1055,8 +1077,12 @@ public class RMAppImpl implements RMApp,
       if (app.finishTime == 0 ) {
         app.finishTime = System.currentTimeMillis();
       }
-      app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
-        finalState));
+      // Recovered apps that are completed were not added to scheduler, so no
+      // need to remove them from scheduler.
+      if (app.recoveredFinalState == null) {
+        app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
+          finalState));
+      }
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -1066,6 +1092,18 @@ public class RMAppImpl implements RMApp,
     };
   }
 
+  private int getNumFailedAppAttempts() {
+    int completedAttempts = 0;
+    // Do not count AM preemption, hardware failures or NM resync
+    // as attempt failure.
+    for (RMAppAttempt attempt : attempts.values()) {
+      if (attempt.shouldCountTowardsMaxAttemptRetry()) {
+        completedAttempts++;
+      }
+    }
+    return completedAttempts;
+  }
+
   private static final class AttemptFailedTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
@@ -1077,8 +1115,9 @@ public class RMAppImpl implements RMApp,
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+
       if (!app.submissionContext.getUnmanagedAM()
-          && app.attempts.size() < app.maxAppAttempts) {
+          && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
         boolean transferStateFromPreviousAttempt = false;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
@@ -1137,6 +1176,9 @@ public class RMAppImpl implements RMApp,
   
   public static boolean isAppInFinalState(RMApp rmApp) {
     RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
+    if (appState == null) {
+      appState = rmApp.getState();
+    }
     return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
         || appState == RMAppState.KILLED;
   }
@@ -1144,4 +1186,30 @@ public class RMAppImpl implements RMApp,
   private RMAppState getRecoveredFinalState() {
     return this.recoveredFinalState;
   }
+
+  @Override
+  public Set<NodeId> getRanNodes() {
+    return ranNodes;
+  }
+  
+  @Override
+  public RMAppMetrics getRMAppMetrics() {
+    Resource resourcePreempted = Resource.newInstance(0, 0);
+    int numAMContainerPreempted = 0;
+    int numNonAMContainerPreempted = 0;
+    for (RMAppAttempt attempt : attempts.values()) {
+      if (null != attempt) {
+        RMAppAttemptMetrics attemptMetrics =
+            attempt.getRMAppAttemptMetrics();
+        Resources.addTo(resourcePreempted,
+            attemptMetrics.getResourcePreempted());
+        numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
+        numNonAMContainerPreempted +=
+            attemptMetrics.getNumNonAMContainersPreempted();
+      }
+    }
+
+    return new RMAppMetrics(resourcePreempted,
+        numNonAMContainerPreempted, numAMContainerPreempted);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Tue Aug 19 23:49:39 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 import java.util.List;
-import java.util.Set;
 
 import javax.crypto.SecretKey;
 
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -115,12 +113,6 @@ public interface RMAppAttempt extends Ev
   FinalApplicationStatus getFinalApplicationStatus();
 
   /**
-   * Nodes on which the containers for this {@link RMAppAttempt} ran.
-   * @return the set of nodes that ran any containers from this {@link RMAppAttempt}
-   */
-  Set<NodeId> getRanNodes();
-
-  /**
    * Return a list of the last set of finished containers, resetting the
    * finished containers to empty.
    * @return the list of just finished containers, re setting the finished containers.
@@ -204,4 +196,21 @@ public interface RMAppAttempt extends Ev
    */
   ApplicationAttemptReport createApplicationAttemptReport();
 
+  /**
+   * Return the flag which indicates whether the attempt failure should be
+   * counted to attempt retry count.
+   * <ul>
+   * There failure types should not be counted to attempt retry count:
+   * <li>preempted by the scheduler.</li>
+   * <li>hardware failures, such as NM failing, lost NM and NM disk errors.</li>
+   * <li>killed by RM because of RM restart or failover.</li>
+   * </ul>
+   */
+  boolean shouldCountTowardsMaxAttemptRetry();
+  
+  /**
+   * Get metrics from the {@link RMAppAttempt}
+   * @return metrics
+   */
+  RMAppAttemptMetrics getRMAppAttemptMetrics();
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Tue Aug 19 23:49:39 2014
@@ -36,7 +36,6 @@ public enum RMAppAttemptEventType {
   UNREGISTERED,
 
   // Source: Containers
-  CONTAINER_ACQUIRED,
   CONTAINER_ALLOCATED,
   CONTAINER_FINISHED,
   

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Aug 19 23:49:39 2014
@@ -26,20 +26,18 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import javax.crypto.SecretKey;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -51,12 +49,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -80,15 +77,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
@@ -133,10 +130,7 @@ public class RMAppAttemptImpl implements
   private final ApplicationSubmissionContext submissionContext;
   private Token<AMRMTokenIdentifier> amrmToken = null;
   private SecretKey clientTokenMasterKey = null;
-
-  //nodes on while this attempt's containers ran
-  private Set<NodeId> ranNodes =
-    new HashSet<NodeId>();
+  
   private List<ContainerStatus> justFinishedContainers =
     new ArrayList<ContainerStatus>();
   private Container masterContainer;
@@ -152,9 +146,15 @@ public class RMAppAttemptImpl implements
   // if an RMAppAttemptUnregistrationEvent occurs
   private FinalApplicationStatus finalStatus = null;
   private final StringBuilder diagnostics = new StringBuilder();
+  private int amContainerExitStatus = ContainerExitStatus.INVALID;
 
   private Configuration conf;
-  private final boolean isLastAttempt;
+  // Since AM preemption, hardware error and NM resync are not counted towards
+  // AM failure count, even if this flag is true, a new attempt can still be
+  // re-created if this attempt is eventually failed because of preemption,
+  // hardware error or NM resync. So this flag indicates that this may be
+  // last attempt.
+  private final boolean maybeLastAttempt;
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
 
@@ -163,6 +163,8 @@ public class RMAppAttemptImpl implements
   private RMAppAttemptState recoveredFinalState;
   private RMAppAttemptState stateBeforeFinalSaving;
   private Object transitionTodo;
+  
+  private RMAppAttemptMetrics attemptMetrics = null;
 
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
@@ -214,21 +216,30 @@ public class RMAppAttemptImpl implements
           RMAppAttemptEventType.KILL,
           new FinalSavingTransition(new BaseFinalTransition(
             RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
-          
+      .addTransition(RMAppAttemptState.SCHEDULED,
+          RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new FinalSavingTransition(
+            new AMContainerCrashedBeforeRunningTransition(),
+            RMAppAttemptState.FAILED))
+
        // Transitions from ALLOCATED_SAVING State
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
           RMAppAttemptState.ALLOCATED,
           RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
-      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
-          RMAppAttemptState.ALLOCATED_SAVING,
-          RMAppAttemptEventType.CONTAINER_ACQUIRED, 
-          new ContainerAcquiredTransition())
+          
        // App could be killed by the client. So need to handle this. 
       .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
           RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
           new FinalSavingTransition(new BaseFinalTransition(
             RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
+      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
+          RMAppAttemptState.FINAL_SAVING,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new FinalSavingTransition(
+            new AMContainerCrashedBeforeRunningTransition(), 
+            RMAppAttemptState.FAILED))
 
        // Transitions from LAUNCHED_UNMANAGED_SAVING State
       .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
@@ -249,10 +260,6 @@ public class RMAppAttemptImpl implements
             RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
 
        // Transitions from ALLOCATED State
-      .addTransition(RMAppAttemptState.ALLOCATED,
-          RMAppAttemptState.ALLOCATED,
-          RMAppAttemptEventType.CONTAINER_ACQUIRED,
-          new ContainerAcquiredTransition())
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
           RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
@@ -267,15 +274,17 @@ public class RMAppAttemptImpl implements
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.CONTAINER_FINISHED,
           new FinalSavingTransition(
-            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+            new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED))
 
        // Transitions from LAUNCHED State
       .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
-      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
+      .addTransition(RMAppAttemptState.LAUNCHED,
+          EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new FinalSavingTransition(
-            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+          new ContainerFinishedTransition(
+            new AMContainerCrashedBeforeRunningTransition(),
+            RMAppAttemptState.LAUNCHED))
       .addTransition(
           RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
@@ -295,14 +304,12 @@ public class RMAppAttemptImpl implements
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.CONTAINER_ALLOCATED)
       .addTransition(
-                RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
-                RMAppAttemptEventType.CONTAINER_ACQUIRED,
-                new ContainerAcquiredTransition())
-      .addTransition(
           RMAppAttemptState.RUNNING,
           EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new ContainerFinishedTransition())
+          new ContainerFinishedTransition(
+            new AMContainerCrashedAtRunningTransition(),
+            RMAppAttemptState.RUNNING))
       .addTransition(
           RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
@@ -333,7 +340,6 @@ public class RMAppAttemptImpl implements
             // should be fixed to reject container allocate request at Final
             // Saving in scheduler
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_ACQUIRED,
               RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
               RMAppAttemptEventType.KILL))
 
@@ -343,7 +349,7 @@ public class RMAppAttemptImpl implements
       // use by the next new attempt.
       .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new ContainerFinishedAtFailedTransition())
+          new ContainerFinishedAtFinalStateTransition())
       .addTransition(
           RMAppAttemptState.FAILED,
           RMAppAttemptState.FAILED,
@@ -379,31 +385,36 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.KILL))
+      .addTransition(RMAppAttemptState.FINISHED, 
+          RMAppAttemptState.FINISHED, 
+          RMAppAttemptEventType.CONTAINER_FINISHED, 
+          new ContainerFinishedAtFinalStateTransition())
 
       // Transitions from KILLED State
       .addTransition(
           RMAppAttemptState.KILLED,
           RMAppAttemptState.KILLED,
           EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
-              RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.LAUNCHED,
               RMAppAttemptEventType.LAUNCH_FAILED,
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.REGISTERED,
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_FINISHED,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.STATUS_UPDATE))
+      .addTransition(RMAppAttemptState.KILLED, 
+          RMAppAttemptState.KILLED, 
+          RMAppAttemptEventType.CONTAINER_FINISHED, 
+          new ContainerFinishedAtFinalStateTransition())
     .installTopology();
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, boolean isLastAttempt) {
+      Configuration conf, boolean maybeLastAttempt) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -417,8 +428,9 @@ public class RMAppAttemptImpl implements
     this.writeLock = lock.writeLock();
 
     this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
-    this.isLastAttempt = isLastAttempt;
+    this.maybeLastAttempt = maybeLastAttempt;
     this.stateMachine = stateMachineFactory.make(this);
+    this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId);
   }
 
   @Override
@@ -526,7 +538,7 @@ public class RMAppAttemptImpl implements
 
   private void setTrackingUrlToRMAppPage() {
     originalTrackingUrl = pjoin(
-        WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf),
+        WebAppUtils.getResolvedRMWebAppURLWithScheme(conf),
         "cluster", "app", getAppAttemptId().getApplicationId());
     proxiedTrackingUrl = originalTrackingUrl;
   }
@@ -545,7 +557,22 @@ public class RMAppAttemptImpl implements
 
   @Override
   public Token<AMRMTokenIdentifier> getAMRMToken() {
-    return this.amrmToken;
+    this.readLock.lock();
+    try {
+      return this.amrmToken;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) {
+    this.writeLock.lock();
+    try {
+      this.amrmToken = lastToken;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Override
@@ -579,6 +606,15 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  public int getAMContainerExitStatus() {
+    this.readLock.lock();
+    try {
+      return this.amContainerExitStatus;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   @Override
   public float getProgress() {
     this.readLock.lock();
@@ -616,11 +652,6 @@ public class RMAppAttemptImpl implements
   }
 
   @Override
-  public Set<NodeId> getRanNodes() {
-    return ranNodes;
-  }
-
-  @Override
   public Container getMasterContainer() {
     this.readLock.lock();
 
@@ -671,9 +702,7 @@ public class RMAppAttemptImpl implements
       ApplicationResourceUsageReport report =
           scheduler.getAppResourceUsageReport(this.getAppAttemptId());
       if (report == null) {
-        Resource none = Resource.newInstance(0, 0);
-        report = ApplicationResourceUsageReport.newInstance(0, 0, none, none,
-            none);
+        report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
       }
       return report;
     } finally {
@@ -692,8 +721,13 @@ public class RMAppAttemptImpl implements
         + attemptState.getState());
     diagnostics.append("Attempt recovered after RM restart");
     diagnostics.append(attemptState.getDiagnostics());
+    this.amContainerExitStatus = attemptState.getAMContainerExitStatus();
+    if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) {
+      this.attemptMetrics.setIsPreempted();
+    }
     setMasterContainer(attemptState.getMasterContainer());
-    recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials());
+    recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
+      attemptState.getState());
     this.recoveredFinalState = attemptState.getState();
     this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
     this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -703,12 +737,13 @@ public class RMAppAttemptImpl implements
 
   public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
     this.justFinishedContainers = attempt.getJustFinishedContainers();
-    this.ranNodes = attempt.getRanNodes();
   }
 
-  private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
-      throws IOException {
-    if (appAttemptTokens == null) {
+  private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
+      RMAppAttemptState state) throws IOException {
+    if (appAttemptTokens == null || state == RMAppAttemptState.FAILED
+        || state == RMAppAttemptState.FINISHED
+        || state == RMAppAttemptState.KILLED) {
       return;
     }
 
@@ -719,12 +754,9 @@ public class RMAppAttemptImpl implements
           .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes);
     }
 
-    // Only one AMRMToken is stored per-attempt, so this should be fine. Can't
-    // use TokenSelector as service may change - think fail-over.
     this.amrmToken =
-        (Token<AMRMTokenIdentifier>) appAttemptTokens
-          .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
-    rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
+        rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+          applicationAttemptId);
   }
 
   private static class BaseTransition implements
@@ -760,13 +792,6 @@ public class RMAppAttemptImpl implements
               .createMasterKey(appAttempt.applicationAttemptId);
       }
 
-      // create AMRMToken
-      AMRMTokenIdentifier id =
-          new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
-      appAttempt.amrmToken =
-          new Token<AMRMTokenIdentifier>(id,
-            appAttempt.rmContext.getAMRMTokenSecretManager());
-
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.
       appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
@@ -838,7 +863,10 @@ public class RMAppAttemptImpl implements
 
       // Set the masterContainer
       appAttempt.setMasterContainer(amContainerAllocation.getContainers()
-        .get(0));
+          .get(0));
+      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
+          .getRMContainer(appAttempt.getMasterContainer().getId());
+      rmMasterContainer.setAMContainer(true);
       // The node set in NMTokenSecrentManager is used for marking whether the
       // NMToken has been issued for this node to the AM.
       // When AM container was allocated to RM itself, the node which allocates
@@ -875,7 +903,6 @@ public class RMAppAttemptImpl implements
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                                                     RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
       appAttempt.launchAttempt();
     }
   }
@@ -904,6 +931,18 @@ public class RMAppAttemptImpl implements
         }
         return appAttempt.recoveredFinalState;
       } else {
+        // Add the current attempt to the scheduler.
+        if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+          // Need to register an app attempt before AM can register
+          appAttempt.masterService
+              .registerAppAttempt(appAttempt.applicationAttemptId);
+
+          // Add attempt to scheduler synchronously to guarantee scheduler
+          // knows attempts before AM or NM re-registers.
+          appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false, true));
+        }
+
         /*
          * Since the application attempt's final state is not saved that means
          * for AM container (previous attempt) state must be one of these.
@@ -947,7 +986,7 @@ public class RMAppAttemptImpl implements
     String diags = null;
     String finalTrackingUrl = null;
     FinalApplicationStatus finalStatus = null;
-
+    int exitStatus = ContainerExitStatus.INVALID;
     switch (event.getType()) {
     case LAUNCH_FAILED:
       RMAppAttemptLaunchFailedEvent launchFaileEvent =
@@ -968,6 +1007,7 @@ public class RMAppAttemptImpl implements
       RMAppAttemptContainerFinishedEvent finishEvent =
           (RMAppAttemptContainerFinishedEvent) event;
       diags = getAMContainerCrashedDiagnostics(finishEvent);
+      exitStatus = finishEvent.getContainerStatus().getExitStatus();
       break;
     case KILL:
       break;
@@ -982,9 +1022,10 @@ public class RMAppAttemptImpl implements
     ApplicationAttemptState attemptState =
         new ApplicationAttemptState(applicationAttemptId, getMasterContainer(),
           rmStore.getCredentialsFromAppAttempt(this), startTime,
-          stateToBeStored, finalTrackingUrl, diags, finalStatus);
+          stateToBeStored, finalTrackingUrl, diags, finalStatus, exitStatus);
     LOG.info("Updating application attempt " + applicationAttemptId
-        + " with final state: " + targetedFinalState);
+        + " with final state: " + targetedFinalState + ", and exit status: "
+        + exitStatus);
     rmStore.updateApplicationAttemptState(attemptState);
   }
 
@@ -1013,14 +1054,6 @@ public class RMAppAttemptImpl implements
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event;
-      if (storeEvent.getUpdatedException() != null) {
-        LOG.error("Failed to update the final state of application attempt: "
-            + storeEvent.getApplicationAttemptId(),
-          storeEvent.getUpdatedException());
-        ExitUtil.terminate(1, storeEvent.getUpdatedException());
-      }
-
       RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving;
 
       if (appAttempt.transitionTodo instanceof SingleArcTransition) {
@@ -1077,11 +1110,20 @@ public class RMAppAttemptImpl implements
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
           appAttempt.invalidateAMHostAndPort();
+
           if (appAttempt.submissionContext
             .getKeepContainersAcrossApplicationAttempts()
-              && !appAttempt.isLastAttempt
               && !appAttempt.submissionContext.getUnmanagedAM()) {
-            keepContainersAcrossAppAttempts = true;
+            // See if we should retain containers for non-unmanaged applications
+            if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) {
+              // Premption, hardware failures, NM resync doesn't count towards
+              // app-failures and so we should retain containers.
+              keepContainersAcrossAppAttempts = true;
+            } else if (!appAttempt.maybeLastAttempt) {
+              // Not preemption, hardware failures or NM resync.
+              // Not last-attempt too - keep containers.
+              keepContainersAcrossAppAttempts = true;
+            }
           }
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
@@ -1121,18 +1163,31 @@ public class RMAppAttemptImpl implements
         appAttempt.getClientTokenMasterKey());
     }
   }
-  
+
+  @Override
+  public boolean shouldCountTowardsMaxAttemptRetry() {
+    try {
+      this.readLock.lock();
+      int exitStatus = getAMContainerExitStatus();
+      return !(exitStatus == ContainerExitStatus.PREEMPTED
+          || exitStatus == ContainerExitStatus.ABORTED
+          || exitStatus == ContainerExitStatus.DISKS_FAILED
+          || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   private static final class UnmanagedAMAttemptSavedTransition 
                                                 extends AMLaunchedTransition {
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
-      appAttempt.checkAttemptStoreError(event);
-      // TODO Today unmanaged AM client is waiting for app state to be Accepted to
-      // launch the AM. This is broken since we changed to start the attempt
-      // after the application is Accepted. We may need to introduce an attempt
-      // report that client can rely on to query the attempt state and choose to
-      // launch the unmanaged AM.
+      // create AMRMToken
+      appAttempt.amrmToken =
+          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
+
       super.transition(appAttempt, event);
     }    
   }
@@ -1207,17 +1262,16 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static final class AMContainerCrashedTransition extends
+  private static final class AMContainerCrashedBeforeRunningTransition extends
       BaseFinalTransition {
 
-    public AMContainerCrashedTransition() {
+    public AMContainerCrashedBeforeRunningTransition() {
       super(RMAppAttemptState.FAILED);
     }
 
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-
       RMAppAttemptContainerFinishedEvent finishEvent =
           ((RMAppAttemptContainerFinishedEvent)event);
 
@@ -1225,23 +1279,39 @@ public class RMAppAttemptImpl implements
       appAttempt.rmContext.getAMLivelinessMonitor().unregister(
           appAttempt.getAppAttemptId());
 
-      // Setup diagnostic message
-      appAttempt.diagnostics
-        .append(getAMContainerCrashedDiagnostics(finishEvent));
+      // Setup diagnostic message and exit status
+      appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
+
       // Tell the app, scheduler
       super.transition(appAttempt, finishEvent);
     }
   }
 
-  private static String getAMContainerCrashedDiagnostics(
+  private void setAMContainerCrashedDiagnosticsAndExitStatus(
+      RMAppAttemptContainerFinishedEvent finishEvent) {
+    ContainerStatus status = finishEvent.getContainerStatus();
+    String diagnostics = getAMContainerCrashedDiagnostics(finishEvent);
+    this.diagnostics.append(diagnostics);
+    this.amContainerExitStatus = status.getExitStatus();
+  }
+
+  private String getAMContainerCrashedDiagnostics(
       RMAppAttemptContainerFinishedEvent finishEvent) {
     ContainerStatus status = finishEvent.getContainerStatus();
-    String diagnostics =
-        "AM Container for " + finishEvent.getApplicationAttemptId()
-            + " exited with " + " exitCode: " + status.getExitStatus()
-            + " due to: " + status.getDiagnostics() + "."
-            + "Failing this attempt.";
-    return diagnostics;
+    StringBuilder diagnosticsBuilder = new StringBuilder();
+    diagnosticsBuilder.append("AM Container for ").append(
+      finishEvent.getApplicationAttemptId()).append(
+      " exited with ").append(" exitCode: ").append(status.getExitStatus()).
+      append("\n");
+    if (this.getTrackingUrl() != null) {
+      diagnosticsBuilder.append("For more detailed output,").append(
+        " check application tracking page:").append(
+        this.getTrackingUrl()).append(
+        "Then, click on links to logs of each attempt.\n");
+    }
+    diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics())
+        .append("Failing this attempt");
+    return diagnosticsBuilder.toString();
   }
 
   private static class FinalTransition extends BaseFinalTransition {
@@ -1395,27 +1465,26 @@ public class RMAppAttemptImpl implements
     finalStatus = unregisterEvent.getFinalApplicationStatus();
   }
 
-  private static final class ContainerAcquiredTransition extends
-      BaseTransition {
-    @Override
-    public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-      RMAppAttemptContainerAcquiredEvent acquiredEvent
-        = (RMAppAttemptContainerAcquiredEvent) event;
-      appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
-    }
-  }
-
   private static final class ContainerFinishedTransition
       implements
       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
 
+    // The transition To Do after attempt final state is saved.
+    private BaseTransition transitionToDo;
+    private RMAppAttemptState currentState;
+
+    public ContainerFinishedTransition(BaseTransition transitionToDo,
+        RMAppAttemptState currentState) {
+      this.transitionToDo = transitionToDo;
+      this.currentState = currentState;
+    }
+
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
-      RMAppAttemptContainerFinishedEvent containerFinishedEvent
-        = (RMAppAttemptContainerFinishedEvent) event;
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
 
@@ -1423,27 +1492,28 @@ public class RMAppAttemptImpl implements
       // the AMContainer, AppAttempt fails
       if (appAttempt.masterContainer != null
           && appAttempt.masterContainer.getId().equals(
-            containerStatus.getContainerId())) {
+              containerStatus.getContainerId())) {
+
         // Remember the follow up transition and save the final attempt state.
         appAttempt.rememberTargetTransitionsAndStoreState(event,
-          new ContainerFinishedFinalStateSavedTransition(),
-          RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+            transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
         return RMAppAttemptState.FINAL_SAVING;
       }
 
-      // Normal container.Put it in completedcontainers list
+      // Normal container.Put it in completed containers list
       appAttempt.justFinishedContainers.add(containerStatus);
-      return RMAppAttemptState.RUNNING;
+      return this.currentState;
     }
   }
 
-  private static final class ContainerFinishedAtFailedTransition
+  private static final class ContainerFinishedAtFinalStateTransition
       extends BaseTransition {
     @Override
     public void
         transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
       RMAppAttemptContainerFinishedEvent containerFinishedEvent =
           (RMAppAttemptContainerFinishedEvent) event;
+      
       ContainerStatus containerStatus =
           containerFinishedEvent.getContainerStatus();
       // Normal container. Add it in completed containers list
@@ -1451,18 +1521,17 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static class ContainerFinishedFinalStateSavedTransition extends
+  private static class AMContainerCrashedAtRunningTransition extends
       BaseTransition {
     @Override
     public void
         transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
-      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+      RMAppAttemptContainerFinishedEvent finishEvent =
           (RMAppAttemptContainerFinishedEvent) event;
       // container associated with AM. must not be unmanaged
       assert appAttempt.submissionContext.getUnmanagedAM() == false;
-      // Setup diagnostic message
-      appAttempt.diagnostics
-        .append(getAMContainerCrashedDiagnostics(containerFinishedEvent));
+      // Setup diagnostic message and exit status
+      appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent);
       new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt,
         event);
     }
@@ -1606,18 +1675,6 @@ public class RMAppAttemptImpl implements
     rmContext.getAMLivelinessMonitor().register(getAppAttemptId());
   }
   
-  private void checkAttemptStoreError(RMAppAttemptEvent event) {
-    RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event;
-    if(storeEvent.getStoredException() != null)
-    {
-      // This needs to be handled for HA and give up master status if we got
-      // fenced
-      LOG.error("Failed to store attempt: " + getAppAttemptId(),
-                storeEvent.getStoredException());
-      ExitUtil.terminate(1, storeEvent.getStoredException());
-    }
-  }
-
   private void storeAttempt() {
     // store attempt data in a non-blocking manner to prevent dispatcher
     // thread starvation and wait for state to be saved
@@ -1663,4 +1720,16 @@ public class RMAppAttemptImpl implements
     }
     return attemptReport;
   }
+
+  // for testing
+  public boolean mayBeLastAttempt() {
+    return maybeLastAttempt;
+  }
+
+  @Override
+  public RMAppAttemptMetrics getRMAppAttemptMetrics() {
+    // didn't use read/write lock here because RMAppAttemptMetrics has its own
+    // lock
+    return attemptMetrics;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
@@ -58,7 +61,7 @@ public interface RMContainer extends Eve
 
   Priority getAllocatedPriority();
 
-  long getStartTime();
+  long getCreationTime();
 
   long getFinishTime();
 
@@ -71,5 +74,9 @@ public interface RMContainer extends Eve
   ContainerState getContainerState();
   
   ContainerReport createContainerReport();
+  
+  boolean isAMContainer();
+  
+  List<ResourceRequest> getResourceRequests();
 
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java Tue Aug 19 23:49:39 2014
@@ -33,5 +33,7 @@ public enum RMContainerEventType {
   RELEASED,
 
   // Source: ContainerAllocationExpirer  
-  EXPIRE
+  EXPIRE,
+
+  RECOVER
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Tue Aug 19 23:49:39 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -27,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -34,13 +36,17 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -65,6 +71,9 @@ public class RMContainerImpl implements 
         RMContainerEventType.KILL)
     .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
+    .addTransition(RMContainerState.NEW,
+        EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
+        RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
 
     // Transitions from RESERVED state
     .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, 
@@ -147,27 +156,35 @@ public class RMContainerImpl implements 
   private Resource reservedResource;
   private NodeId reservedNode;
   private Priority reservedPriority;
-  private long startTime;
+  private long creationTime;
   private long finishTime;
   private ContainerStatus finishedStatus;
+  private boolean isAMContainer;
+  private List<ResourceRequest> resourceRequests;
 
-
-
+  public RMContainerImpl(Container container,
+      ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+      RMContext rmContext) {
+    this(container, appAttemptId, nodeId, user, rmContext, System
+      .currentTimeMillis());
+  }
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId,
-      String user, RMContext rmContext) {
+      String user, RMContext rmContext, long creationTime) {
     this.stateMachine = stateMachineFactory.make(this);
     this.containerId = container.getId();
     this.nodeId = nodeId;
     this.container = container;
     this.appAttemptId = appAttemptId;
     this.user = user;
-    this.startTime = System.currentTimeMillis();
+    this.creationTime = creationTime;
     this.rmContext = rmContext;
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
-    
+    this.isAMContainer = false;
+    this.resourceRequests = null;
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -232,8 +249,8 @@ public class RMContainerImpl implements 
   }
 
   @Override
-  public long getStartTime() {
-    return startTime;
+  public long getCreationTime() {
+    return creationTime;
   }
 
   @Override
@@ -298,6 +315,25 @@ public class RMContainerImpl implements 
       readLock.unlock();
     }
   }
+  
+  @Override
+  public List<ResourceRequest> getResourceRequests() {
+    try {
+      readLock.lock();
+      return resourceRequests;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public void setResourceRequests(List<ResourceRequest> requests) {
+    try {
+      writeLock.lock();
+      this.resourceRequests = requests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   @Override
   public String toString() {
@@ -305,6 +341,25 @@ public class RMContainerImpl implements 
   }
   
   @Override
+  public boolean isAMContainer() {
+    try {
+      readLock.lock();
+      return isAMContainer;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public void setAMContainer(boolean isAMContainer) {
+    try {
+      writeLock.lock();
+      this.isAMContainer = isAMContainer;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  @Override
   public void handle(RMContainerEvent event) {
     LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
     try {
@@ -341,6 +396,38 @@ public class RMContainerImpl implements 
     }
   }
 
+  private static final class ContainerRecoveredTransition
+      implements
+      MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> {
+    @Override
+    public RMContainerState transition(RMContainerImpl container,
+        RMContainerEvent event) {
+      NMContainerStatus report =
+          ((RMContainerRecoverEvent) event).getContainerReport();
+      if (report.getContainerState().equals(ContainerState.COMPLETE)) {
+        ContainerStatus status =
+            ContainerStatus.newInstance(report.getContainerId(),
+              report.getContainerState(), report.getDiagnostics(),
+              report.getContainerExitStatus());
+
+        new FinishedTransition().transition(container,
+          new RMContainerFinishedEvent(container.containerId, status,
+            RMContainerEventType.FINISHED));
+        return RMContainerState.COMPLETED;
+      } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
+        // Tell the app
+        container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+            .getApplicationAttemptId().getApplicationId(), container.nodeId));
+        return RMContainerState.RUNNING;
+      } else {
+        // This can never happen.
+        LOG.warn("RMContainer received unexpected recover event with container"
+            + " state " + report.getContainerState() + " while recovering.");
+        return RMContainerState.RUNNING;
+      }
+    }
+  }
+
   private static final class ContainerReservedTransition extends
   BaseTransition {
 
@@ -368,12 +455,15 @@ public class RMContainerImpl implements 
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Clear ResourceRequest stored in RMContainer
+      container.setResourceRequests(null);
+      
       // Register with containerAllocationExpirer.
       container.containerAllocationExpirer.register(container.getContainerId());
 
-      // Tell the appAttempt
-      container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
-          container.getApplicationAttemptId(), container.getContainer()));
+      // Tell the app
+      container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+          .getApplicationAttemptId().getApplicationId(), container.nodeId));
     }
   }
 
@@ -396,11 +486,30 @@ public class RMContainerImpl implements 
       container.finishTime = System.currentTimeMillis();
       container.finishedStatus = finishedEvent.getRemoteContainerStatus();
       // Inform AppAttempt
+      // container.getContainer() can return null when a RMContainer is a
+      // reserved container
+      updateMetricsIfPreempted(container);
+
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
-          container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+        container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
 
-      container.rmContext.getRMApplicationHistoryWriter()
-          .containerFinished(container);
+      container.rmContext.getRMApplicationHistoryWriter().containerFinished(
+        container);
+    }
+
+    private static void updateMetricsIfPreempted(RMContainerImpl container) {
+      // If this is a preempted container, update preemption metrics
+      if (ContainerExitStatus.PREEMPTED == container.finishedStatus
+        .getExitStatus()) {
+
+        Resource resource = container.getContainer().getResource();
+        RMAppAttempt rmAttempt =
+            container.rmContext.getRMApps()
+              .get(container.getApplicationAttemptId().getApplicationId())
+              .getCurrentAppAttempt();
+        rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource,
+          container);
+      }
     }
   }
 
@@ -442,7 +551,7 @@ public class RMContainerImpl implements 
     try {
       containerReport = ContainerReport.newInstance(this.getContainerId(),
           this.getAllocatedResource(), this.getAllocatedNode(),
-          this.getAllocatedPriority(), this.getStartTime(),
+          this.getAllocatedPriority(), this.getCreationTime(),
           this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(),
           this.getContainerExitStatus(), this.getContainerState());
     } finally {
@@ -450,5 +559,4 @@ public class RMContainerImpl implements 
     }
     return containerReport;
   }
-
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Tue Aug 19 23:49:39 2014
@@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -48,12 +47,15 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -91,9 +93,9 @@ public class RMNodeImpl implements RMNod
   private final RMContext context;
   private final String hostName;
   private final int commandPort;
-  private final int httpPort;
+  private int httpPort;
   private final String nodeAddress; // The containerManager address
-  private final String httpAddress;
+  private String httpAddress;
   private volatile ResourceOption resourceOption;
   private final Node node;
 
@@ -102,8 +104,8 @@ public class RMNodeImpl implements RMNod
   private String nodeManagerVersion;
 
   /* set of containers that have just launched */
-  private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
-    new HashMap<ContainerId, ContainerStatus>();
+  private final Set<ContainerId> launchedContainers =
+    new HashSet<ContainerId>();
 
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@@ -454,19 +456,33 @@ public class RMNodeImpl implements RMNod
     }
   }
 
+  private static void handleRunningAppOnNode(RMNodeImpl rmNode,
+      RMContext context, ApplicationId appId, NodeId nodeId) {
+    RMApp app = context.getRMApps().get(appId);
+
+    // if we failed getting app by appId, maybe something wrong happened, just
+    // add the app to the finishedApplications list so that the app can be
+    // cleaned up on the NM
+    if (null == app) {
+      LOG.warn("Cannot get RMApp by appId=" + appId
+          + ", just added it to finishedApplications list for cleanup");
+      rmNode.finishedApplications.add(appId);
+      return;
+    }
+
+    context.getDispatcher().getEventHandler()
+        .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
+  }
+
   public static class AddNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
+      RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
+      List<NMContainerStatus> containers = null;
 
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeAddedSchedulerEvent(rmNode));
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodesListManagerEvent(
-              NodesListManagerEventType.NODE_USABLE, rmNode));
- 
       String host = rmNode.nodeId.getHost();
       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
         // Old node rejoining
@@ -476,44 +492,51 @@ public class RMNodeImpl implements RMNod
       } else {
         // Increment activeNodes explicitly because this is a new node.
         ClusterMetrics.getMetrics().incrNumActiveNodes();
+        containers = startEvent.getNMContainerStatuses();
+        if (containers != null && !containers.isEmpty()) {
+          for (NMContainerStatus container : containers) {
+            if (container.getContainerState() == ContainerState.RUNNING) {
+              rmNode.launchedContainers.add(container.getContainerId());
+            }
+          }
+        }
       }
+      
+      if (null != startEvent.getRunningApplications()) {
+        for (ApplicationId appId : startEvent.getRunningApplications()) {
+          handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
+        }
+      }
+
+      rmNode.context.getDispatcher().getEventHandler()
+        .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+      rmNode.context.getDispatcher().getEventHandler().handle(
+        new NodesListManagerEvent(
+            NodesListManagerEventType.NODE_USABLE, rmNode));
     }
   }
-  
+
   public static class ReconnectNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      // Kill containers since node is rejoining.
-      rmNode.nodeUpdateQueue.clear();
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeRemovedSchedulerEvent(rmNode));
-
-      RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode();
-      if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
-          && rmNode.getHttpPort() == newNode.getHttpPort()) {
-        // Reset heartbeat ID since node just restarted.
-        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
-        if (rmNode.getState() != NodeState.UNHEALTHY) {
-          // Only add new node if old state is not UNHEALTHY
-          rmNode.context.getDispatcher().getEventHandler().handle(
-              new NodeAddedSchedulerEvent(rmNode));
+      RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
+      RMNode newNode = reconnectEvent.getReconnectedNode();
+      rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
+      rmNode.httpPort = newNode.getHttpPort();
+      rmNode.httpAddress = newNode.getHttpAddress();
+      rmNode.resourceOption = newNode.getResourceOption();
+
+      // Reset heartbeat ID since node just restarted.
+      rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
+
+      if (null != reconnectEvent.getRunningApplications()) {
+        for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
+          handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
         }
-      } else {
-        // Reconnected node differs, so replace old node and start new node
-        switch (rmNode.getState()) {
-        case RUNNING:
-          ClusterMetrics.getMetrics().decrNumActiveNodes();
-          break;
-        case UNHEALTHY:
-          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
-          break;
-        }
-        rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
       }
+
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodesListManagerEvent(
               NodesListManagerEventType.NODE_USABLE, rmNode));
@@ -633,14 +656,14 @@ public class RMNodeImpl implements RMNod
 
         // Process running containers
         if (remoteContainer.getState() == ContainerState.RUNNING) {
-          if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
+          if (!rmNode.launchedContainers.contains(containerId)) {
             // Just launched container. RM knows about it the first time.
-            rmNode.justLaunchedContainers.put(containerId, remoteContainer);
+            rmNode.launchedContainers.add(containerId);
             newlyLaunchedContainers.add(remoteContainer);
           }
         } else {
           // A finished container
-          rmNode.justLaunchedContainers.remove(containerId);
+          rmNode.launchedContainers.remove(containerId);
           completedContainers.add(remoteContainer);
         }
       }
@@ -717,4 +740,10 @@ public class RMNodeImpl implements RMNod
   public int getQueueSize() {
     return nodeUpdateQueue.size();
   }
+
+  // For test only.
+  @VisibleForTesting
+  public Set<ContainerId> getLaunchedContainers() {
+    return this.launchedContainers;
+  }
  }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Tue Aug 19 23:49:39 2014
@@ -18,17 +18,27 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 
 public class RMNodeReconnectEvent extends RMNodeEvent {
   private RMNode reconnectedNode;
+  private List<ApplicationId> runningApplications;
 
-  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) {
+  public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
+      List<ApplicationId> runningApps) {
     super(nodeId, RMNodeEventType.RECONNECTED);
     reconnectedNode = newNode;
+    runningApplications = runningApps;
   }
 
   public RMNode getReconnectedNode() {
     return reconnectedNode;
   }
+
+  public List<ApplicationId> getRunningApplications() {
+    return runningApplications;
+  }
 }