You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/01/15 07:06:35 UTC

svn commit: r1558303 [2/5] - in /hadoop/common/branches/HDFS-5535/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/ma...

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Jan 15 06:06:31 2014
@@ -34,9 +34,155 @@
 
   <dependencies>
     <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject.extensions</groupId>
+      <artifactId>guice-servlet</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey.jersey-test-framework</groupId>
+      <artifactId>jersey-test-framework-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey.contribs</groupId>
+      <artifactId>jersey-guice</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>javax.xml.bind</groupId>
+      <artifactId>jaxb-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-common</artifactId>
     </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>
@@ -51,12 +197,20 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>com.sun.jersey.jersey-test-framework</groupId>
+      <artifactId>jersey-test-framework-grizzly2</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Jan 15 06:06:31 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionContainer;
@@ -78,6 +79,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -271,6 +273,11 @@ public class ApplicationMasterService ex
             .getClientToAMTokenSecretManager()
             .getMasterKey(applicationAttemptId).getEncoded()));        
       }
+
+      List<Container> containerList =
+          ((AbstractYarnScheduler) rScheduler)
+            .getTransferredContainers(applicationAttemptId);
+      response.setContainersFromPreviousAttempt(containerList);
       return response;
     }
   }
@@ -421,21 +428,26 @@ public class ApplicationMasterService ex
         LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
         throw e;
       }
-      
-      try {
-        RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
-      } catch (InvalidContainerReleaseException e) {
-        LOG.warn("Invalid container release by application " + appAttemptId, e);
-        throw e;
+
+      RMApp app =
+          this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
+      // In the case of work-preserving AM restart, it's possible for the
+      // AM to release containers from the earlier attempt.
+      if (!app.getApplicationSubmissionContext()
+        .getKeepContainersAcrossApplicationAttempts()) {
+        try {
+          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+        } catch (InvalidContainerReleaseException e) {
+          LOG.warn("Invalid container release by application " + appAttemptId, e);
+          throw e;
+        }
       }
-      
+
       // Send new requests to appAttempt.
       Allocation allocation =
           this.rScheduler.allocate(appAttemptId, ask, release, 
               blacklistAdditions, blacklistRemovals);
 
-      RMApp app = this.rmContext.getRMApps().get(
-          appAttemptId.getApplicationId());
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
       
       AllocateResponse allocateResponse =
@@ -591,4 +603,4 @@ public class ApplicationMasterService ex
       this.response = response;
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Jan 15 06:06:31 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -180,13 +181,11 @@ public class ResourceManager extends Com
     this.conf = conf;
     this.rmContext = new RMContextImpl();
 
-    rmDispatcher = createDispatcher();
+    // register the handlers for all AlwaysOn services using setupDispatcher().
+    rmDispatcher = setupDispatcher();
     addIfService(rmDispatcher);
     rmContext.setDispatcher(rmDispatcher);
 
-    rmDispatcher.register(RMFatalEventType.class,
-        new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
-
     adminService = createAdminService();
     addService(adminService);
     rmContext.setRMAdminService(adminService);
@@ -832,6 +831,7 @@ public class ResourceManager extends Com
         HAServiceProtocol.HAServiceState.ACTIVE) {
       stopActiveServices();
       if (initialize) {
+        resetDispatcher();
         createAndInitActiveServices();
       }
     }
@@ -994,4 +994,24 @@ public class ResourceManager extends Com
       YarnConfiguration.YARN_HTTP_POLICY_KEY,
       YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
   }
+
+  /**
+   * Register the handlers for alwaysOn services
+   */
+  private Dispatcher setupDispatcher() {
+    Dispatcher dispatcher = createDispatcher();
+    dispatcher.register(RMFatalEventType.class,
+        new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
+    return dispatcher;
+  }
+
+  private void resetDispatcher() {
+    Dispatcher dispatcher = setupDispatcher();
+    ((Service)dispatcher).init(this.conf);
+    ((Service)dispatcher).start();
+    removeService((Service)rmDispatcher);
+    rmDispatcher = dispatcher;
+    addIfService(rmDispatcher);
+    rmContext.setDispatcher(rmDispatcher);
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppFailedAttemptEvent.java Wed Jan 15 06:06:31 2014
@@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.api.record
 public class RMAppFailedAttemptEvent extends RMAppEvent {
 
   private final String diagnostics;
+  private final boolean transferStateFromPreviousAttempt;
 
   public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event, 
-      String diagnostics) {
+      String diagnostics, boolean transferStateFromPreviousAttempt) {
     super(appId, event);
     this.diagnostics = diagnostics;
+    this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
   }
 
   public String getDiagnostics() {
     return this.diagnostics;
   }
+
+  public boolean getTransferStateFromPreviousAttempt() {
+    return transferStateFromPreviousAttempt;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Jan 15 06:06:31 2014
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class RMAppImpl implements RMApp, Recoverable {
 
   private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
@@ -633,7 +635,7 @@ public class RMAppImpl implements RMApp,
       this.writeLock.unlock();
     }
   }
-  
+
   @Override
   public void recover(RMState state) throws Exception{
     ApplicationState appState = state.getApplicationState().get(getApplicationId());
@@ -646,26 +648,28 @@ public class RMAppImpl implements RMApp,
 
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
-      createNewAttempt(false);
+      createNewAttempt();
       ((RMAppAttemptImpl)this.currentAttempt).recover(state);
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private void createNewAttempt(boolean startAttempt) {
+  private void createNewAttempt() {
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf);
+          submissionContext, conf, maxAppAttempts == attempts.size());
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
-    if(startAttempt) {
-      handler.handle(
-          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
-    }
   }
   
+  private void
+      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
+    createNewAttempt();
+    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
+      transferStateFromPreviousAttempt));
+  }
+
   private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
     NodeState nodeState = node.getState();
     updatedNodes.add(node);
@@ -688,7 +692,6 @@ public class RMAppImpl implements RMApp,
     };
   }
 
-  @SuppressWarnings("unchecked")
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
@@ -729,7 +732,6 @@ public class RMAppImpl implements RMApp,
 
   private static final class AddApplicationToSchedulerTransition extends
       RMAppTransition {
-    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
       if (event instanceof RMAppNewSavedEvent) {
@@ -751,14 +753,13 @@ public class RMAppImpl implements RMApp,
   private static final class StartAppAttemptTransition extends RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.createNewAttempt(true);
+      app.createAndStartNewAttempt(false);
     };
   }
 
   private static final class FinalStateSavedTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
       RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event;
@@ -959,7 +960,6 @@ public class RMAppImpl implements RMApp,
   }
 
   private static class KillAttemptTransition extends RMAppTransition {
-    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
       app.stateBeforeKilling = app.getState();
@@ -987,7 +987,6 @@ public class RMAppImpl implements RMApp,
       return nodes;
     }
 
-    @SuppressWarnings("unchecked")
     public void transition(RMAppImpl app, RMAppEvent event) {
       Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
       for (NodeId nodeId : nodes) {
@@ -1019,7 +1018,21 @@ public class RMAppImpl implements RMApp,
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
       if (!app.submissionContext.getUnmanagedAM()
           && app.attempts.size() < app.maxAppAttempts) {
-        app.createNewAttempt(true);
+        boolean transferStateFromPreviousAttempt = false;
+        RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
+        transferStateFromPreviousAttempt =
+            failedEvent.getTransferStateFromPreviousAttempt();
+
+        RMAppAttempt oldAttempt = app.currentAttempt;
+        app.createAndStartNewAttempt(transferStateFromPreviousAttempt);
+        // Transfer the state from the previous attempt to the current attempt.
+        // Note that the previous failed attempt may still be collecting the
+        // container events from the scheduler and update its data structures
+        // before the new attempt is created.
+        if (transferStateFromPreviousAttempt) {
+          ((RMAppAttemptImpl) app.currentAttempt)
+            .transferStateFromPreviousAttempt(oldAttempt);
+        }
         return initialState;
       } else {
         app.rememberTargetTransitionsAndStoreState(event,

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Jan 15 06:06:31 2014
@@ -129,9 +129,9 @@ public class RMAppAttemptImpl implements
   private SecretKey clientTokenMasterKey = null;
 
   //nodes on while this attempt's containers ran
-  private final Set<NodeId> ranNodes =
+  private Set<NodeId> ranNodes =
     new HashSet<NodeId>();
-  private final List<ContainerStatus> justFinishedContainers =
+  private List<ContainerStatus> justFinishedContainers =
     new ArrayList<ContainerStatus>();
   private Container masterContainer;
 
@@ -148,7 +148,7 @@ public class RMAppAttemptImpl implements
   private final StringBuilder diagnostics = new StringBuilder();
 
   private Configuration conf;
-  
+  private final boolean isLastAttempt;
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
 
@@ -330,6 +330,12 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.KILL))
 
       // Transitions from FAILED State
+      // For work-preserving AM restart, failed attempt are still capturing
+      // CONTAINER_FINISHED event and record the finished containers for the
+      // use by the next new attempt.
+      .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED,
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new ContainerFinishedAtFailedTransition())
       .addTransition(
           RMAppAttemptState.FAILED,
           RMAppAttemptState.FAILED,
@@ -338,8 +344,7 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.KILL,
               RMAppAttemptEventType.UNREGISTERED,
               RMAppAttemptEventType.STATUS_UPDATE,
-              RMAppAttemptEventType.CONTAINER_ALLOCATED,
-              RMAppAttemptEventType.CONTAINER_FINISHED))
+              RMAppAttemptEventType.CONTAINER_ALLOCATED))
 
       // Transitions from FINISHING State
       .addTransition(RMAppAttemptState.FINISHING,
@@ -390,7 +395,7 @@ public class RMAppAttemptImpl implements
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf) {
+      Configuration conf, boolean isLastAttempt) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -404,7 +409,7 @@ public class RMAppAttemptImpl implements
     this.writeLock = lock.writeLock();
 
     this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
-    
+    this.isLastAttempt = isLastAttempt;
     this.stateMachine = stateMachineFactory.make(this);
   }
 
@@ -416,7 +421,7 @@ public class RMAppAttemptImpl implements
   @Override
   public ApplicationSubmissionContext getSubmissionContext() {
     return this.submissionContext;
-  } 
+  }
 
   @Override
   public FinalApplicationStatus getFinalApplicationStatus() {
@@ -685,6 +690,11 @@ public class RMAppAttemptImpl implements
     this.startTime = attemptState.getStartTime();
   }
 
+  public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
+    this.justFinishedContainers = attempt.getJustFinishedContainers();
+    this.ranNodes = attempt.getRanNodes();
+  }
+
   private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
       throws IOException {
     if (appAttemptTokens == null) {
@@ -721,6 +731,12 @@ public class RMAppAttemptImpl implements
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
+	    boolean transferStateFromPreviousAttempt = false;
+      if (event instanceof RMAppStartAttemptEvent) {
+        transferStateFromPreviousAttempt =
+            ((RMAppStartAttemptEvent) event)
+              .getTransferStateFromPreviousAttempt();
+      }
       appAttempt.startTime = System.currentTimeMillis();
 
       // Register with the ApplicationMasterService
@@ -740,9 +756,10 @@ public class RMAppAttemptImpl implements
           new Token<AMRMTokenIdentifier>(id,
             appAttempt.rmContext.getAMRMTokenSecretManager());
 
-      // Add the applicationAttempt to the scheduler
+      // Add the applicationAttempt to the scheduler and inform the scheduler
+      // whether to transfer the state from previous attempt.
       appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-        appAttempt.applicationAttemptId));
+        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
     }
   }
 
@@ -981,6 +998,7 @@ public class RMAppAttemptImpl implements
       // Tell the application and the scheduler
       ApplicationId applicationId = appAttemptId.getApplicationId();
       RMAppEvent appEvent = null;
+      boolean keepContainersAcrossAppAttempts = false;
       switch (finalAttemptState) {
         case FINISHED:
         {
@@ -996,7 +1014,7 @@ public class RMAppAttemptImpl implements
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
                   RMAppEventType.ATTEMPT_KILLED,
-                  "Application killed by user.");
+                  "Application killed by user.", false);
         }
         break;
         case FAILED:
@@ -1004,10 +1022,17 @@ public class RMAppAttemptImpl implements
           // don't leave the tracking URL pointing to a non-existent AM
           appAttempt.setTrackingUrlToRMAppPage();
           appAttempt.invalidateAMHostAndPort();
+          if (appAttempt.submissionContext
+            .getKeepContainersAcrossApplicationAttempts()
+              && !appAttempt.isLastAttempt
+              && !appAttempt.submissionContext.getUnmanagedAM()) {
+            keepContainersAcrossAppAttempts = true;
+          }
           appEvent =
               new RMAppFailedAttemptEvent(applicationId,
-                  RMAppEventType.ATTEMPT_FAILED,
-                  appAttempt.getDiagnostics());
+                RMAppEventType.ATTEMPT_FAILED, appAttempt.getDiagnostics(),
+                keepContainersAcrossAppAttempts);
+
         }
         break;
         default:
@@ -1019,7 +1044,7 @@ public class RMAppAttemptImpl implements
 
       appAttempt.eventHandler.handle(appEvent);
       appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
-        appAttemptId, finalAttemptState));
+        appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts));
       appAttempt.removeCredentials(appAttempt);
     }
   }
@@ -1045,6 +1070,11 @@ public class RMAppAttemptImpl implements
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
       appAttempt.checkAttemptStoreError(event);
+      // TODO Today unmanaged AM client is waiting for app state to be Accepted to
+      // launch the AM. This is broken since we changed to start the attempt
+      // after the application is Accepted. We may need to introduce an attempt
+      // report that client can rely on to query the attempt state and choose to
+      // launch the unmanaged AM.
       super.transition(appAttempt, event);
     }    
   }
@@ -1346,6 +1376,20 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  private static final class ContainerFinishedAtFailedTransition
+      extends BaseTransition {
+    @Override
+    public void
+        transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent =
+          (RMAppAttemptContainerFinishedEvent) event;
+      ContainerStatus containerStatus =
+          containerFinishedEvent.getContainerStatus();
+      // Normal container. Add it in completed containers list
+      appAttempt.justFinishedContainers.add(containerStatus);
+    }
+  }
+
   private static class ContainerFinishedFinalStateSavedTransition extends
       BaseTransition {
     @Override

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Jan 15 06:06:31 2014
@@ -59,10 +59,10 @@ public class AppSchedulingInfo {
 
   final Set<Priority> priorities = new TreeSet<Priority>(
       new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
-  final Map<Priority, Map<String, ResourceRequest>> requests = 
+  final Map<Priority, Map<String, ResourceRequest>> requests =
     new HashMap<Priority, Map<String, ResourceRequest>>();
-  final Set<String> blacklist = new HashSet<String>();
-  
+  private Set<String> blacklist = new HashSet<String>();
+
   //private final ApplicationStore store;
   private final ActiveUsersManager activeUsersManager;
   
@@ -260,7 +260,7 @@ public class AppSchedulingInfo {
       // once an allocation is done we assume the application is
       // running from scheduler's POV.
       pending = false;
-      metrics.incrAppsRunning(this, user);
+      metrics.runAppAttempt(applicationId, user);
     }
     LOG.debug("allocate: user: " + user + ", memory: "
         + request.getCapability());
@@ -390,7 +390,7 @@ public class AppSchedulingInfo {
                 .getNumContainers()));
       }
     }
-    metrics.finishApp(this, rmAppAttemptFinalState);
+    metrics.finishAppAttempt(applicationId, pending, user);
     
     // Clear requests themselves
     clearRequests();
@@ -399,4 +399,15 @@ public class AppSchedulingInfo {
   public synchronized void setQueue(Queue queue) {
     this.queue = queue;
   }
+
+  public synchronized Set<String> getBlackList() {
+    return this.blacklist;
+  }
+
+  public synchronized void transferStateFromPreviousAppSchedulingInfo(
+      AppSchedulingInfo appInfo) {
+    //    this.priorities = appInfo.getPriorities();
+    //    this.requests = appInfo.getRequests();
+    this.blacklist = appInfo.getBlackList();
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Wed Jan 15 06:06:31 2014
@@ -41,7 +41,7 @@ import org.apache.hadoop.metrics2.lib.Mu
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
@@ -57,7 +57,7 @@ public class QueueMetrics implements Met
   @Metric("# of pending apps") MutableGaugeInt appsPending;
   @Metric("# of apps completed") MutableCounterInt appsCompleted;
   @Metric("# of apps killed") MutableCounterInt appsKilled;
-  @Metric("# of apps failed") MutableGaugeInt appsFailed;
+  @Metric("# of apps failed") MutableCounterInt appsFailed;
 
   @Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
   @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@@ -214,54 +214,70 @@ public class QueueMetrics implements Met
     registry.snapshot(collector.addRecord(registry.info()), all);
   }
 
-  public void submitApp(String user, int attemptId) {
-    if (attemptId == 1) {
-      appsSubmitted.incr();
-    } else {
-      appsFailed.decr();
+  public void submitApp(String user) {
+    appsSubmitted.incr();
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.submitApp(user);
     }
+    if (parent != null) {
+      parent.submitApp(user);
+    }
+  }
+
+  public void submitAppAttempt(String user) {
     appsPending.incr();
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.submitApp(user, attemptId);
+      userMetrics.submitAppAttempt(user);
     }
     if (parent != null) {
-      parent.submitApp(user, attemptId);
+      parent.submitAppAttempt(user);
     }
   }
 
-  public void incrAppsRunning(AppSchedulingInfo app, String user) {
-    runBuckets.add(app.getApplicationId(), System.currentTimeMillis());
+  public void runAppAttempt(ApplicationId appId, String user) {
+    runBuckets.add(appId, System.currentTimeMillis());
     appsRunning.incr();
     appsPending.decr();
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.incrAppsRunning(app, user);
+      userMetrics.runAppAttempt(appId, user);
     }
     if (parent != null) {
-      parent.incrAppsRunning(app, user);
+      parent.runAppAttempt(appId, user);
     }
   }
 
-  public void finishApp(AppSchedulingInfo app,
-      RMAppAttemptState rmAppAttemptFinalState) {
-    runBuckets.remove(app.getApplicationId());
-    switch (rmAppAttemptFinalState) {
-      case KILLED: appsKilled.incr(); break;
-      case FAILED: appsFailed.incr(); break;
-      default: appsCompleted.incr();  break;
-    }
-    if (app.isPending()) {
+  public void finishAppAttempt(
+      ApplicationId appId, boolean isPending, String user) {
+    runBuckets.remove(appId);
+    if (isPending) {
       appsPending.decr();
     } else {
       appsRunning.decr();
     }
-    QueueMetrics userMetrics = getUserMetrics(app.getUser());
+    QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.finishApp(app, rmAppAttemptFinalState);
+      userMetrics.finishAppAttempt(appId, isPending, user);
     }
     if (parent != null) {
-      parent.finishApp(app, rmAppAttemptFinalState);
+      parent.finishAppAttempt(appId, isPending, user);
+    }
+  }
+
+  public void finishApp(String user, RMAppState rmAppFinalState) {
+    switch (rmAppFinalState) {
+      case KILLED: appsKilled.incr(); break;
+      case FAILED: appsFailed.incr(); break;
+      default: appsCompleted.incr();  break;
+    }
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.finishApp(user, rmAppFinalState);
+    }
+    if (parent != null) {
+      parent.finishApp(user, rmAppFinalState);
     }
   }
 
@@ -493,4 +509,8 @@ public class QueueMetrics implements Met
   public int getActiveApps() {
     return activeApplications.value();
   }
+  
+  public MetricsSystem getMetricsSystem() {
+    return metricsSystem;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Wed Jan 15 06:06:31 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.re
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 
 @Private
 @Unstable
@@ -26,6 +27,7 @@ public class SchedulerApplication {
 
   private final Queue queue;
   private final String user;
+  private SchedulerApplicationAttempt currentAttempt;
 
   public SchedulerApplication(Queue queue, String user) {
     this.queue = queue;
@@ -39,4 +41,17 @@ public class SchedulerApplication {
   public String getUser() {
     return user;
   }
+
+  public SchedulerApplicationAttempt getCurrentAppAttempt() {
+    return currentAttempt;
+  }
+
+  public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
+    this.currentAttempt = currentAttempt;
+  }
+
+  public void stop(RMAppState rmAppFinalState) {
+    queue.getMetrics().finishApp(user, rmAppFinalState);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Jan 15 06:06:31 2014
@@ -64,7 +64,7 @@ public abstract class SchedulerApplicati
 
   protected final AppSchedulingInfo appSchedulingInfo;
   
-  protected final Map<ContainerId, RMContainer> liveContainers =
+  protected Map<ContainerId, RMContainer> liveContainers =
       new HashMap<ContainerId, RMContainer>();
   protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
@@ -73,7 +73,7 @@ public abstract class SchedulerApplicati
   
   protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
-  protected final Resource currentConsumption = Resource.newInstance(0, 0);
+  protected Resource currentConsumption = Resource.newInstance(0, 0);
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -407,4 +407,29 @@ public abstract class SchedulerApplicati
         Resources.add(currentConsumption, currentReservation));
   }
 
+  public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
+    return this.liveContainers;
+  }
+
+  public synchronized Resource getResourceLimit() {
+    return this.resourceLimit;
+  }
+
+  public synchronized Map<Priority, Long> getLastScheduledContainer() {
+    return this.lastScheduledContainer;
+  }
+
+  public synchronized void transferStateFromPreviousAttempt(
+      SchedulerApplicationAttempt appAttempt) {
+    this.liveContainers = appAttempt.getLiveContainersMap();
+    // this.reReservations = appAttempt.reReservations;
+    this.currentConsumption = appAttempt.getCurrentConsumption();
+    this.resourceLimit = appAttempt.getResourceLimit();
+    // this.currentReservation = appAttempt.currentReservation;
+    // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
+    // this.schedulingOpportunities = appAttempt.schedulingOpportunities;
+    this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
+    this.appSchedulingInfo
+      .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Wed Jan 15 06:06:31 2014
@@ -19,13 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 
 /**
@@ -170,4 +171,13 @@ public interface YarnScheduler extends E
   @LimitedPrivate("yarn")
   @Stable
   public List<ApplicationAttemptId> getAppsInQueue(String queueName);
+
+  /**
+   * Get the container for the given containerId.
+   * @param containerId
+   * @return the container for the given containerId.
+   */
+  @LimitedPrivate("yarn")
+  @Unstable
+  public RMContainer getRMContainer(ContainerId containerId);
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Jan 15 06:06:31 2014
@@ -63,14 +63,16 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -94,7 +96,7 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class CapacityScheduler
+public class CapacityScheduler extends AbstractYarnScheduler
   implements PreemptableResourceScheduler, CapacitySchedulerContext,
              Configurable {
 
@@ -176,7 +178,6 @@ public class CapacityScheduler
 
   private CapacitySchedulerConfiguration conf;
   private Configuration yarnConf;
-  private RMContext rmContext;
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
@@ -190,14 +191,6 @@ public class CapacityScheduler
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  @VisibleForTesting
-  protected Map<ApplicationId, SchedulerApplication> applications =
-      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
-
-  @VisibleForTesting
-  protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts = 
-      new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
   private boolean initialized = false;
 
   private ResourceCalculator calculator;
@@ -274,9 +267,10 @@ public class CapacityScheduler
       this.maximumAllocation = this.conf.getMaximumAllocation();
       this.calculator = this.conf.getResourceCalculator();
       this.usePortForNodeName = this.conf.getUsePortForNodeName();
-
+      this.applications =
+          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
       this.rmContext = rmContext;
-      
+
       initializeQueues(this.conf);
       
       initialized = true;
@@ -464,21 +458,27 @@ public class CapacityScheduler
   }
 
   private synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId) {
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt) {
     SchedulerApplication application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
 
-    FiCaSchedulerApp SchedulerApp =
+    FiCaSchedulerApp attempt =
         new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
           queue, queue.getActiveUsersManager(), rmContext);
-    appAttempts.put(applicationAttemptId, SchedulerApp);
-    queue.submitApplicationAttempt(SchedulerApp, application.getUser());
+    if (transferStateFromPreviousAttempt) {
+      attempt.transferStateFromPreviousAttempt(application
+        .getCurrentAppAttempt());
+    }
+    application.setCurrentAppAttempt(attempt);
+
+    queue.submitApplicationAttempt(attempt, application.getUser());
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    rmContext.getDispatcher().getEventHandler().handle(
-      new RMAppAttemptEvent(applicationAttemptId,
+    rmContext.getDispatcher().getEventHandler() .handle(
+        new RMAppAttemptEvent(applicationAttemptId,
           RMAppAttemptEventType.ATTEMPT_ADDED));
   }
 
@@ -486,7 +486,9 @@ public class CapacityScheduler
       RMAppState finalState) {
     SchedulerApplication application = applications.get(applicationId);
     if (application == null){
-      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
+      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
+      // ignore it.
+      LOG.warn("Couldn't find application " + applicationId);
       return;
     }
     CSQueue queue = (CSQueue) application.getQueue();
@@ -496,57 +498,62 @@ public class CapacityScheduler
     } else {
       queue.finishApplication(applicationId, application.getUser());
     }
+    application.stop(finalState);
     applications.remove(applicationId);
   }
 
   private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState) {
+      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
     LOG.info("Application Attempt " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
 
-    if (application == null) {
-      //      throw new IOException("Unknown application " + applicationId + 
-      //          " has completed!");
+    if (application == null || attempt == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
       return;
     }
-    
-    // Release all the running containers 
-    for (RMContainer rmContainer : application.getLiveContainers()) {
-      completedContainer(rmContainer, 
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(), 
-              SchedulerUtils.COMPLETED_APPLICATION), 
-          RMContainerEventType.KILL);
+
+    // Release all the allocated, acquired, running containers
+    for (RMContainer rmContainer : attempt.getLiveContainers()) {
+      if (keepContainers
+          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
+        // do not kill the running container in the case of work-preserving AM
+        // restart.
+        LOG.info("Skip killing " + rmContainer.getContainerId());
+        continue;
+      }
+      completedContainer(
+        rmContainer,
+        SchedulerUtils.createAbnormalContainerStatus(
+          rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
+        RMContainerEventType.KILL);
     }
-    
-     // Release all reserved containers
-    for (RMContainer rmContainer : application.getReservedContainers()) {
-      completedContainer(rmContainer, 
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(), 
-              "Application Complete"), 
-          RMContainerEventType.KILL);
+
+    // Release all reserved containers
+    for (RMContainer rmContainer : attempt.getReservedContainers()) {
+      completedContainer(
+        rmContainer,
+        SchedulerUtils.createAbnormalContainerStatus(
+          rmContainer.getContainerId(), "Application Complete"),
+        RMContainerEventType.KILL);
     }
-    
+
     // Clean up pending requests, metrics etc.
-    application.stop(rmAppAttemptFinalState);
-    
+    attempt.stop(rmAppAttemptFinalState);
+
     // Inform the queue
-    String queueName = application.getQueue().getQueueName();
+    String queueName = attempt.getQueue().getQueueName();
     CSQueue queue = queues.get(queueName);
     if (!(queue instanceof LeafQueue)) {
       LOG.error("Cannot finish application " + "from non-leaf queue: "
           + queueName);
     } else {
-      queue.finishApplicationAttempt(application, queue.getQueueName());
+      queue.finishApplicationAttempt(attempt, queue.getQueueName());
     }
-    
-    // Remove from our data-structure
-    appAttempts.remove(applicationAttemptId);
   }
 
   private static final Allocation EMPTY_ALLOCATION = 
@@ -558,7 +565,7 @@ public class CapacityScheduler
       List<ResourceRequest> ask, List<ContainerId> release, 
       List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -700,8 +707,8 @@ public class CapacityScheduler
 
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
-      FiCaSchedulerApp reservedApplication = 
-          getApplication(reservedContainer.getApplicationAttemptId());
+      FiCaSchedulerApp reservedApplication =
+          getCurrentAttemptForContainer(reservedContainer.getContainerId());
       
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
@@ -738,12 +745,11 @@ public class CapacityScheduler
 
   private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
     if (application == null) {
-      LOG.info("Unknown application: " + applicationAttemptId + 
-          " launched container " + containerId +
-          " on node: " + node);
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
       this.rmContext.getDispatcher().getEventHandler()
         .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
       return;
@@ -791,7 +797,8 @@ public class CapacityScheduler
     {
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
+        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -799,7 +806,8 @@ public class CapacityScheduler
       AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
           (AppAttemptRemovedSchedulerEvent) event;
       doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
-        appAttemptRemovedEvent.getFinalAttemptState());
+        appAttemptRemovedEvent.getFinalAttemptState(),
+        appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
     }
     break;
     case CONTAINER_EXPIRED:
@@ -874,13 +882,13 @@ public class CapacityScheduler
     Container container = rmContainer.getContainer();
     
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId =
-      container.getId().getApplicationAttemptId();
-    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    FiCaSchedulerApp application =
+        getCurrentAttemptForContainer(container.getId());
+    ApplicationId appId =
+        container.getId().getApplicationAttemptId().getApplicationId();
     if (application == null) {
-      LOG.info("Container " + container + " of" +
-      		" unknown application " + applicationAttemptId + 
-          " completed with event " + event);
+      LOG.info("Container " + container + " of" + " unknown application "
+          + appId + " completed with event " + event);
       return;
     }
     
@@ -892,28 +900,33 @@ public class CapacityScheduler
     queue.completedContainer(clusterResource, application, node, 
         rmContainer, containerStatus, event, null);
 
-    LOG.info("Application " + applicationAttemptId + 
-        " released container " + container.getId() +
-        " on node: " + node + 
-        " with event: " + event);
+    LOG.info("Application attempt " + application.getApplicationAttemptId()
+        + " released container " + container.getId() + " on node: " + node
+        + " with event: " + event);
   }
 
   @Lock(Lock.NoLock.class)
-  FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
-    return appAttempts.get(applicationAttemptId);
+  FiCaSchedulerApp getApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication app =
+        applications.get(applicationAttemptId.getApplicationId());
+    if (app != null) {
+      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
     return app == null ? null : new SchedulerAppReport(app);
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplication(applicationAttemptId);
+    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
     return app == null ? null : app.getResourceUsageReport();
   }
   
@@ -922,10 +935,22 @@ public class CapacityScheduler
     return nodes.get(nodeId);
   }
 
-  private RMContainer getRMContainer(ContainerId containerId) {
-    FiCaSchedulerApp application = 
-        getApplication(containerId.getApplicationAttemptId());
-    return (application == null) ? null : application.getRMContainer(containerId);
+  @Override
+  public RMContainer getRMContainer(ContainerId containerId) {
+    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
+    return (attempt == null) ? null : attempt.getRMContainer(containerId);
+  }
+
+  @VisibleForTesting
+  public FiCaSchedulerApp getCurrentAttemptForContainer(
+      ContainerId containerId) {
+    SchedulerApplication app =
+        applications.get(containerId.getApplicationAttemptId()
+          .getApplicationId());
+    if (app != null) {
+      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
+    }
+    return null;
   }
 
   @Override
@@ -958,7 +983,7 @@ public class CapacityScheduler
       LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
           " container: " + cont.toString());
     }
-    FiCaSchedulerApp app = appAttempts.get(aid);
+    FiCaSchedulerApp app = getApplicationAttempt(aid);
     if (app != null) {
       app.addPreemptContainer(cont.getContainerId());
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Jan 15 06:06:31 2014
@@ -644,8 +644,7 @@ public class LeafQueue implements CSQueu
       addApplicationAttempt(application, user);
     }
 
-    int attemptId = application.getApplicationAttemptId().getAttemptId();
-    metrics.submitApp(userName, attemptId);
+    metrics.submitAppAttempt(userName);
     getParent().submitApplicationAttempt(application, userName);
   }
 
@@ -702,6 +701,8 @@ public class LeafQueue implements CSQueu
           getParent().getQueuePath(), ace);
       throw ace;
     }
+
+    metrics.submitApp(userName);
   }
 
   private synchronized void activateApplications() {

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Wed Jan 15 06:06:31 2014
@@ -219,7 +219,8 @@ public class FiCaSchedulerNode extends S
             " on node " + this.reservedContainer.getReservedNode());
       }
       
-      // Cannot reserve more than one application on a given node!
+      // Cannot reserve more than one application attempt on a given node!
+      // Reservation is still against attempt.
       if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
           reservedContainer.getContainer().getId().getApplicationAttemptId())) {
         throw new IllegalStateException("Trying to reserve" +

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Wed Jan 15 06:06:31 2014
@@ -23,14 +23,21 @@ import org.apache.hadoop.yarn.api.record
 public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
 
   private final ApplicationAttemptId applicationAttemptId;
+  private final boolean transferStateFromPreviousAttempt;
 
   public AppAttemptAddedSchedulerEvent(
-      ApplicationAttemptId applicationAttemptId) {
+      ApplicationAttemptId applicationAttemptId,
+      boolean transferStateFromPreviousAttempt) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
+    this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
     return applicationAttemptId;
   }
+
+  public boolean getTransferStateFromPreviousAttempt() {
+    return transferStateFromPreviousAttempt;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java Wed Jan 15 06:06:31 2014
@@ -25,13 +25,15 @@ public class AppAttemptRemovedSchedulerE
 
   private final ApplicationAttemptId applicationAttemptId;
   private final RMAppAttemptState finalAttemptState;
+  private final boolean keepContainersAcrossAppAttempts;
 
   public AppAttemptRemovedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState finalAttemptState) {
+      RMAppAttemptState finalAttemptState, boolean keepContainers) {
     super(SchedulerEventType.APP_ATTEMPT_REMOVED);
     this.applicationAttemptId = applicationAttemptId;
     this.finalAttemptState = finalAttemptState;
+    this.keepContainersAcrossAppAttempts = keepContainers;
   }
 
   public ApplicationAttemptId getApplicationAttemptID() {
@@ -41,4 +43,8 @@ public class AppAttemptRemovedSchedulerE
   public RMAppAttemptState getFinalAttemptState() {
     return this.finalAttemptState;
   }
+
+  public boolean getKeepContainersAcrossAppAttempts() {
+    return this.keepContainersAcrossAppAttempts;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Wed Jan 15 06:06:31 2014
@@ -76,7 +76,8 @@ public class AllocationConfiguration {
   @VisibleForTesting
   QueuePlacementPolicy placementPolicy;
   
-  private final Set<String> queueNames;
+  @VisibleForTesting
+  Set<String> queueNames;
   
   public AllocationConfiguration(Map<String, Resource> minQueueResources, 
       Map<String, Resource> maxQueueResources, 

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Wed Jan 15 06:06:31 2014
@@ -214,7 +214,7 @@ public class FSLeafQueue extends FSQueue
   }
 
   @Override
-  public Collection<FSQueue> getChildQueues() {
+  public List<FSQueue> getChildQueues() {
     return new ArrayList<FSQueue>(1);
   }
   

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Wed Jan 15 06:06:31 2014
@@ -157,7 +157,7 @@ public class FSParentQueue extends FSQue
   }
 
   @Override
-  public Collection<FSQueue> getChildQueues() {
+  public List<FSQueue> getChildQueues() {
     return childQueues;
   }
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1558303&r1=1558302&r2=1558303&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Wed Jan 15 06:06:31 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -158,7 +159,7 @@ public abstract class FSQueue extends Sc
   /**
    * Gets the children of this queue, if any.
    */
-  public abstract Collection<FSQueue> getChildQueues();
+  public abstract List<FSQueue> getChildQueues();
   
   /**
    * Adds all applications in the queue and its subqueues to the given collection.