You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/01/03 08:27:13 UTC

svn commit: r1555021 [4/7] - in /hadoop/common/branches/HDFS-5535/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/j...

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Fri Jan  3 07:26:52 2014
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
   private final Collection<RMContainer> reserved;
   private final boolean pending;
   
-  public SchedulerAppReport(SchedulerApplication app) {
+  public SchedulerAppReport(SchedulerApplicationAttempt app) {
     this.live = app.getLiveContainers();
     this.reserved = app.getReservedContainers();
     this.pending = app.isPending();

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java Fri Jan  3 07:26:52 2014
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 
 public class SchedulerAppUtils {
 
-  public static  boolean isBlacklisted(SchedulerApplication application,
+  public static  boolean isBlacklisted(SchedulerApplicationAttempt application,
       SchedulerNode node, Log LOG) {
     if (application.isBlacklisted(node.getNodeName())) {
       if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Jan  3 07:26:52 2014
@@ -17,393 +17,26 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
 
-/**
- * Represents an application attempt from the viewpoint of the scheduler.
- * Each running app attempt in the RM corresponds to one instance
- * of this class.
- */
 @Private
 @Unstable
-public abstract class SchedulerApplication {
-  
-  private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
-
-  protected final AppSchedulingInfo appSchedulingInfo;
-  
-  protected final Map<ContainerId, RMContainer> liveContainers =
-      new HashMap<ContainerId, RMContainer>();
-  protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
-      new HashMap<Priority, Map<NodeId, RMContainer>>();
-
-  private final Multiset<Priority> reReservations = HashMultiset.create();
-  
-  protected final Resource currentReservation = Resource.newInstance(0, 0);
-  private Resource resourceLimit = Resource.newInstance(0, 0);
-  protected final Resource currentConsumption = Resource.newInstance(0, 0);
+public class SchedulerApplication {
 
-  protected List<RMContainer> newlyAllocatedContainers = 
-      new ArrayList<RMContainer>();
+  private final Queue queue;
+  private final String user;
 
-  /**
-   * Count how many times the application has been given an opportunity
-   * to schedule a task at each priority. Each time the scheduler
-   * asks the application for a task at this priority, it is incremented,
-   * and each time the application successfully schedules a task, it
-   * is reset to 0.
-   */
-  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
-  
-  // Time of the last container scheduled at the current allowed level
-  protected Map<Priority, Long> lastScheduledContainer =
-      new HashMap<Priority, Long>();
-
-  protected final Queue queue;
-  protected boolean isStopped = false;
-  
-  protected final RMContext rmContext;
-  
-  public SchedulerApplication(ApplicationAttemptId applicationAttemptId, 
-      String user, Queue queue, ActiveUsersManager activeUsersManager,
-      RMContext rmContext) {
-    this.rmContext = rmContext;
-    this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager);
+  public SchedulerApplication(Queue queue, String user) {
     this.queue = queue;
-  }
-  
-  /**
-   * Get the live containers of the application.
-   * @return live containers of the application
-   */
-  public synchronized Collection<RMContainer> getLiveContainers() {
-    return new ArrayList<RMContainer>(liveContainers.values());
-  }
-  
-  /**
-   * Is this application pending?
-   * @return true if it is else false.
-   */
-  public boolean isPending() {
-    return appSchedulingInfo.isPending();
-  }
-  
-  /**
-   * Get {@link ApplicationAttemptId} of the application master.
-   * @return <code>ApplicationAttemptId</code> of the application master
-   */
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return appSchedulingInfo.getApplicationAttemptId();
-  }
-  
-  public ApplicationId getApplicationId() {
-    return appSchedulingInfo.getApplicationId();
-  }
-  
-  public String getUser() {
-    return appSchedulingInfo.getUser();
-  }
-
-  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
-    return appSchedulingInfo.getResourceRequests(priority);
-  }
-
-  public int getNewContainerId() {
-    return appSchedulingInfo.getNewContainerId();
-  }
-
-  public Collection<Priority> getPriorities() {
-    return appSchedulingInfo.getPriorities();
-  }
-  
-  public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
-    return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
-  }
-
-  public synchronized int getTotalRequiredResources(Priority priority) {
-    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
-  }
-
-  public Resource getResource(Priority priority) {
-    return appSchedulingInfo.getResource(priority);
-  }
-
-  public String getQueueName() {
-    return appSchedulingInfo.getQueueName();
-  }
-  
-  public synchronized RMContainer getRMContainer(ContainerId id) {
-    return liveContainers.get(id);
-  }
-
-  protected synchronized void resetReReservations(Priority priority) {
-    reReservations.setCount(priority, 0);
-  }
-
-  protected synchronized void addReReservation(Priority priority) {
-    reReservations.add(priority);
-  }
-
-  public synchronized int getReReservations(Priority priority) {
-    return reReservations.count(priority);
+    this.user = user;
   }
 
-  /**
-   * Get total current reservations.
-   * Used only by unit tests
-   * @return total current reservations
-   */
-  @Stable
-  @Private
-  public synchronized Resource getCurrentReservation() {
-    return currentReservation;
-  }
-  
   public Queue getQueue() {
     return queue;
   }
-  
-  public synchronized void updateResourceRequests(
-      List<ResourceRequest> requests) {
-    if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
-    }
-  }
-  
-  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
-    // Cleanup all scheduling information
-    isStopped = true;
-    appSchedulingInfo.stop(rmAppAttemptFinalState);
-  }
-
-  public synchronized boolean isStopped() {
-    return isStopped;
-  }
-
-  /**
-   * Get the list of reserved containers
-   * @return All of the reserved containers.
-   */
-  public synchronized List<RMContainer> getReservedContainers() {
-    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
-      this.reservedContainers.entrySet()) {
-      reservedContainers.addAll(e.getValue().values());
-    }
-    return reservedContainers;
-  }
-  
-  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
-      RMContainer rmContainer, Container container) {
-    // Create RMContainer if necessary
-    if (rmContainer == null) {
-      rmContainer = 
-          new RMContainerImpl(container, getApplicationAttemptId(), 
-              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
-              rmContext.getContainerAllocationExpirer());
-        
-      Resources.addTo(currentReservation, container.getResource());
-      
-      // Reset the re-reservation count
-      resetReReservations(priority);
-    } else {
-      // Note down the re-reservation
-      addReReservation(priority);
-    }
-    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
-        container.getResource(), node.getNodeID(), priority));
-    
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers == null) {
-      reservedContainers = new HashMap<NodeId, RMContainer>();
-      this.reservedContainers.put(priority, reservedContainers);
-    }
-    reservedContainers.put(node.getNodeID(), rmContainer);
-    
-    LOG.info("Application " + getApplicationId() 
-        + " reserved container " + rmContainer
-        + " on node " + node + ", currently has " + reservedContainers.size()
-        + " at priority " + priority 
-        + "; currentReservation " + currentReservation.getMemory());
-    
-    return rmContainer;
-  }
-  
-  /**
-   * Has the application reserved the given <code>node</code> at the
-   * given <code>priority</code>?
-   * @param node node to be checked
-   * @param priority priority of reserved container
-   * @return true is reserved, false if not
-   */
-  public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers != null) {
-      return reservedContainers.containsKey(node.getNodeID());
-    }
-    return false;
-  }
-  
-  public synchronized void setHeadroom(Resource globalLimit) {
-    this.resourceLimit = globalLimit; 
-  }
-
-  /**
-   * Get available headroom in terms of resources for the application's user.
-   * @return available resource headroom
-   */
-  public synchronized Resource getHeadroom() {
-    // Corner case to deal with applications being slightly over-limit
-    if (resourceLimit.getMemory() < 0) {
-      resourceLimit.setMemory(0);
-    }
-    
-    return resourceLimit;
-  }
-  
-  public synchronized int getNumReservedContainers(Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    return (reservedContainers == null) ? 0 : reservedContainers.size();
-  }
-  
-  @SuppressWarnings("unchecked")
-  public synchronized void containerLaunchedOnNode(ContainerId containerId,
-      NodeId nodeId) {
-    // Inform the container
-    RMContainer rmContainer = getRMContainer(containerId);
-    if (rmContainer == null) {
-      // Some unknown container sneaked into the system. Kill it.
-      rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
-      return;
-    }
-
-    rmContainer.handle(new RMContainerEvent(containerId,
-        RMContainerEventType.LAUNCHED));
-  }
-  
-  public synchronized void showRequests() {
-    if (LOG.isDebugEnabled()) {
-      for (Priority priority : getPriorities()) {
-        Map<String, ResourceRequest> requests = getResourceRequests(priority);
-        if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " headRoom=" + getHeadroom() + 
-              " currentConsumption=" + currentConsumption.getMemory());
-          for (ResourceRequest request : requests.values()) {
-            LOG.debug("showRequests:" + " application=" + getApplicationId()
-                + " request=" + request);
-          }
-        }
-      }
-    }
-  }
-  
-  public Resource getCurrentConsumption() {
-    return currentConsumption;
-  }
-
-  public synchronized List<Container> pullNewlyAllocatedContainers() {
-    List<Container> returnContainerList = new ArrayList<Container>(
-        newlyAllocatedContainers.size());
-    for (RMContainer rmContainer : newlyAllocatedContainers) {
-      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
-          RMContainerEventType.ACQUIRED));
-      returnContainerList.add(rmContainer.getContainer());
-    }
-    newlyAllocatedContainers.clear();
-    return returnContainerList;
-  }
 
-  public synchronized void updateBlacklist(
-      List<String> blacklistAdditions, List<String> blacklistRemovals) {
-    if (!isStopped) {
-      this.appSchedulingInfo.updateBlacklist(
-          blacklistAdditions, blacklistRemovals);
-    }
-  }
-  
-  public boolean isBlacklisted(String resourceName) {
-    return this.appSchedulingInfo.isBlacklisted(resourceName);
-  }
-
-  public synchronized void addSchedulingOpportunity(Priority priority) {
-    schedulingOpportunities.setCount(priority,
-        schedulingOpportunities.count(priority) + 1);
-  }
-  
-  public synchronized void subtractSchedulingOpportunity(Priority priority) {
-    int count = schedulingOpportunities.count(priority) - 1;
-    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
-  }
-
-  /**
-   * Return the number of times the application has been given an opportunity
-   * to schedule a task at the given priority since the last time it
-   * successfully did so.
-   */
-  public synchronized int getSchedulingOpportunities(Priority priority) {
-    return schedulingOpportunities.count(priority);
-  }
-  
-  /**
-   * Should be called when an application has successfully scheduled a container,
-   * or when the scheduling locality threshold is relaxed.
-   * Reset various internal counters which affect delay scheduling
-   *
-   * @param priority The priority of the container scheduled.
-   */
-  public synchronized void resetSchedulingOpportunities(Priority priority) {
-    resetSchedulingOpportunities(priority, System.currentTimeMillis());
-  }
-  // used for continuous scheduling
-  public synchronized void resetSchedulingOpportunities(Priority priority,
-      long currentTimeMs) {
-    lastScheduledContainer.put(priority, currentTimeMs);
-    schedulingOpportunities.setCount(priority, 0);
-  }
-  
-  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
-    return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
-        reservedContainers.size(), Resources.clone(currentConsumption),
-        Resources.clone(currentReservation),
-        Resources.add(currentConsumption, currentReservation));
+  public String getUser() {
+    return user;
   }
-
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Fri Jan  3 07:26:52 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -35,7 +36,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
@@ -155,21 +155,32 @@ extends org.apache.hadoop.yarn.server.re
   
   /**
    * Submit a new application to the queue.
-   * @param application application being submitted
+   * @param applicationId the applicationId of the application being submitted
    * @param user user who submitted the application
    * @param queue queue to which the application is submitted
    */
-  public void submitApplication(FiCaSchedulerApp application, String user, 
-      String queue) 
-  throws AccessControlException;
-  
+  public void submitApplication(ApplicationId applicationId, String user,
+      String queue) throws AccessControlException;
+
+  /**
+   * Submit an application attempt to the queue.
+   */
+  public void submitApplicationAttempt(FiCaSchedulerApp application,
+      String userName);
+
   /**
    * An application submitted to this queue has finished.
-   * @param application
-   * @param queue application queue 
+   * @param applicationId
+   * @param user user who submitted the application
    */
-  public void finishApplication(FiCaSchedulerApp application, String queue);
-  
+  public void finishApplication(ApplicationId applicationId, String user);
+
+  /**
+   * An application attempt submitted to this queue has finished.
+   */
+  public void finishApplicationAttempt(FiCaSchedulerApp application,
+      String queue);
+
   /**
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Jan  3 07:26:52 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,10 +54,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -65,13 +69,15 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -185,7 +191,11 @@ public class CapacityScheduler
   private Resource maximumAllocation;
 
   @VisibleForTesting
-  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications = 
+  protected Map<ApplicationId, SchedulerApplication> applications =
+      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+  @VisibleForTesting
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts = 
       new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
 
   private boolean initialized = false;
@@ -415,61 +425,84 @@ public class CapacityScheduler
   synchronized CSQueue getQueue(String queueName) {
     return queues.get(queueName);
   }
-  
-  private synchronized void
-      addApplication(ApplicationAttemptId applicationAttemptId,
-          String queueName, String user) {
 
-    // Sanity checks
+  private synchronized void addApplication(ApplicationId applicationId,
+      String queueName, String user) {
+    // santiy checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
-      String message = "Application " + applicationAttemptId + 
+      String message = "Application " + applicationId + 
       " submitted by user " + user + " to unknown queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, message));
       return;
     }
     if (!(queue instanceof LeafQueue)) {
-      String message = "Application " + applicationAttemptId + 
+      String message = "Application " + applicationId + 
           " submitted by user " + user + " to non-leaf queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, message));
       return;
     }
-
-    // TODO: Fix store
-    FiCaSchedulerApp SchedulerApp = 
-        new FiCaSchedulerApp(applicationAttemptId, user, queue, 
-            queue.getActiveUsersManager(), rmContext);
-
     // Submit to the queue
     try {
-      queue.submitApplication(SchedulerApp, user, queueName);
+      queue.submitApplication(applicationId, user, queueName);
     } catch (AccessControlException ace) {
-      LOG.info("Failed to submit application " + applicationAttemptId + 
-          " to queue " + queueName + " from user " + user, ace);
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, 
-              ace.toString()));
+      LOG.info("Failed to submit application " + applicationId + " to queue "
+          + queueName + " from user " + user, ace);
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
       return;
     }
+    SchedulerApplication application =
+        new SchedulerApplication(queue, user);
+    applications.put(applicationId, application);
+    LOG.info("Accepted application " + applicationId + " from user: " + user
+        + ", in queue: " + queueName);
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+  }
 
-    applications.put(applicationAttemptId, SchedulerApp);
-
-    LOG.info("Application Submission: " + applicationAttemptId + 
-        ", user: " + user +
-        " queue: " + queue +
-        ", currently active: " + applications.size());
-
+  private synchronized void addApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    CSQueue queue = (CSQueue) application.getQueue();
+
+    FiCaSchedulerApp SchedulerApp =
+        new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
+          queue, queue.getActiveUsersManager(), rmContext);
+    appAttempts.put(applicationAttemptId, SchedulerApp);
+    queue.submitApplicationAttempt(SchedulerApp, application.getUser());
+    LOG.info("Added Application Attempt " + applicationAttemptId
+        + " to scheduler from user " + application.getUser() + " in queue "
+        + queue.getQueueName());
     rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.APP_ACCEPTED));
+      new RMAppAttemptEvent(applicationAttemptId,
+          RMAppAttemptEventType.ATTEMPT_ADDED));
   }
 
-  private synchronized void doneApplication(
+  private synchronized void doneApplication(ApplicationId applicationId,
+      RMAppState finalState) {
+    SchedulerApplication application = applications.get(applicationId);
+    if (application == null){
+      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
+      return;
+    }
+    CSQueue queue = (CSQueue) application.getQueue();
+    if (!(queue instanceof LeafQueue)) {
+      LOG.error("Cannot finish application " + "from non-leaf queue: "
+          + queue.getQueueName());
+    } else {
+      queue.finishApplication(applicationId, application.getUser());
+    }
+    applications.remove(applicationId);
+  }
+
+  private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState) {
-    LOG.info("Application " + applicationAttemptId + " is done." +
+    LOG.info("Application Attempt " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
     FiCaSchedulerApp application = getApplication(applicationAttemptId);
@@ -509,11 +542,11 @@ public class CapacityScheduler
       LOG.error("Cannot finish application " + "from non-leaf queue: "
           + queueName);
     } else {
-      queue.finishApplication(application, queue.getQueueName());
+      queue.finishApplicationAttempt(application, queue.getQueueName());
     }
     
     // Remove from our data-structure
-    applications.remove(applicationAttemptId);
+    appAttempts.remove(applicationAttemptId);
   }
 
   private static final Allocation EMPTY_ALLOCATION = 
@@ -742,16 +775,31 @@ public class CapacityScheduler
     break;
     case APP_ADDED:
     {
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
-      addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
-          .getQueue(), appAddedEvent.getUser());
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      addApplication(appAddedEvent.getApplicationId(),
+        appAddedEvent.getQueue(), appAddedEvent.getUser());
     }
     break;
     case APP_REMOVED:
     {
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
-      doneApplication(appRemovedEvent.getApplicationAttemptID(),
-          appRemovedEvent.getFinalAttemptState());
+      doneApplication(appRemovedEvent.getApplicationID(),
+        appRemovedEvent.getFinalState());
+    }
+    break;
+    case APP_ATTEMPT_ADDED:
+    {
+      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+          (AppAttemptAddedSchedulerEvent) event;
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+    }
+    break;
+    case APP_ATTEMPT_REMOVED:
+    {
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          (AppAttemptRemovedSchedulerEvent) event;
+      doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
+        appAttemptRemovedEvent.getFinalAttemptState());
     }
     break;
     case CONTAINER_EXPIRED:
@@ -852,7 +900,7 @@ public class CapacityScheduler
 
   @Lock(Lock.NoLock.class)
   FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
-    return applications.get(applicationAttemptId);
+    return appAttempts.get(applicationAttemptId);
   }
 
   @Override
@@ -910,7 +958,7 @@ public class CapacityScheduler
       LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
           " container: " + cont.toString());
     }
-    FiCaSchedulerApp app = applications.get(aid);
+    FiCaSchedulerApp app = appAttempts.get(aid);
     if (app != null) {
       app.addPreemptContainer(cont.getContainerId());
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Fri Jan  3 07:26:52 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -59,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -99,7 +99,7 @@ public class LeafQueue implements CSQueu
   private volatile int numContainers;
 
   Set<FiCaSchedulerApp> activeApplications;
-  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap = 
+  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = 
       new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
   Set<FiCaSchedulerApp> pendingApplications;
@@ -635,7 +635,22 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void submitApplication(FiCaSchedulerApp application, String userName,
+  public void submitApplicationAttempt(FiCaSchedulerApp application,
+      String userName) {
+    // Careful! Locking order is important!
+    synchronized (this) {
+      User user = getUser(userName);
+      // Add the attempt to our data-structures
+      addApplicationAttempt(application, user);
+    }
+
+    int attemptId = application.getApplicationAttemptId().getAttemptId();
+    metrics.submitApp(userName, attemptId);
+    getParent().submitApplicationAttempt(application, userName);
+  }
+
+  @Override
+  public void submitApplication(ApplicationId applicationId, String userName,
       String queue)  throws AccessControlException {
     // Careful! Locking order is important!
 
@@ -653,8 +668,7 @@ public class LeafQueue implements CSQueu
       // Check if the queue is accepting jobs
       if (getState() != QueueState.RUNNING) {
         String msg = "Queue " + getQueuePath() +
-        " is STOPPED. Cannot accept submission of application: " +
-        application.getApplicationId();
+        " is STOPPED. Cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
@@ -663,8 +677,7 @@ public class LeafQueue implements CSQueu
       if (getNumApplications() >= getMaxApplications()) {
         String msg = "Queue " + getQueuePath() + 
         " already has " + getNumApplications() + " applications," +
-        " cannot accept submission of application: " + 
-        application.getApplicationId();
+        " cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
@@ -675,26 +688,18 @@ public class LeafQueue implements CSQueu
         String msg = "Queue " + getQueuePath() + 
         " already has " + user.getTotalApplications() + 
         " applications from user " + userName + 
-        " cannot accept submission of application: " + 
-        application.getApplicationId();
+        " cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
-
-      // Add the application to our data-structures
-      addApplication(application, user);
     }
 
-    int attemptId = application.getApplicationAttemptId().getAttemptId();
-    metrics.submitApp(userName, attemptId);
-
     // Inform the parent queue
     try {
-      getParent().submitApplication(application, userName, queue);
+      getParent().submitApplication(applicationId, userName, queue);
     } catch (AccessControlException ace) {
       LOG.info("Failed to submit application to parent-queue: " + 
           getParent().getQueuePath(), ace);
-      removeApplication(application, user);
       throw ace;
     }
   }
@@ -722,11 +727,11 @@ public class LeafQueue implements CSQueu
     }
   }
   
-  private synchronized void addApplication(FiCaSchedulerApp application, User user) {
+  private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
     // Accept 
     user.submitApplication();
     pendingApplications.add(application);
-    applicationsMap.put(application.getApplicationAttemptId(), application);
+    applicationAttemptMap.put(application.getApplicationAttemptId(), application);
 
     // Activate applications
     activateApplications();
@@ -742,22 +747,28 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void finishApplication(FiCaSchedulerApp application, String queue) {
+  public void finishApplication(ApplicationId application, String user) {
+    // Inform the activeUsersManager
+    activeUsersManager.deactivateApplication(user, application);
+    // Inform the parent queue
+    getParent().finishApplication(application, user);
+  }
+
+  @Override
+  public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
     // Careful! Locking order is important!
     synchronized (this) {
-      removeApplication(application, getUser(application.getUser()));
+      removeApplicationAttempt(application, getUser(application.getUser()));
     }
-
-    // Inform the parent queue
-    getParent().finishApplication(application, queue);
+    getParent().finishApplicationAttempt(application, queue);
   }
 
-  public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
+  public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
     boolean wasActive = activeApplications.remove(application);
     if (!wasActive) {
       pendingApplications.remove(application);
     }
-    applicationsMap.remove(application.getApplicationAttemptId());
+    applicationAttemptMap.remove(application.getApplicationAttemptId());
 
     user.finishApplication(wasActive);
     if (user.getTotalApplications() == 0) {
@@ -766,13 +777,7 @@ public class LeafQueue implements CSQueu
 
     // Check if we can activate more applications
     activateApplications();
-    
-    // Inform the activeUsersManager
-    synchronized (application) {
-      activeUsersManager.deactivateApplication(
-          application.getUser(), application.getApplicationId());
-    }
-    
+
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
@@ -783,10 +788,10 @@ public class LeafQueue implements CSQueu
         " #queue-active-applications: " + getNumActiveApplications()
         );
   }
-  
+
   private synchronized FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
-    return applicationsMap.get(applicationAttemptId);
+    return applicationAttemptMap.get(applicationAttemptId);
   }
 
   private static final CSAssignment NULL_ASSIGNMENT =

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Fri Jan  3 07:26:52 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -51,7 +52,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -442,7 +442,7 @@ public class ParentQueue implements CSQu
   }
 
   @Override
-  public void submitApplication(FiCaSchedulerApp application, String user,
+  public void submitApplication(ApplicationId applicationId, String user,
       String queue) throws AccessControlException {
     
     synchronized (this) {
@@ -455,57 +455,70 @@ public class ParentQueue implements CSQu
       if (state != QueueState.RUNNING) {
         throw new AccessControlException("Queue " + getQueuePath() +
             " is STOPPED. Cannot accept submission of application: " +
-            application.getApplicationId());
+            applicationId);
       }
 
-      addApplication(application, user);
+      addApplication(applicationId, user);
     }
     
     // Inform the parent queue
     if (parent != null) {
       try {
-        parent.submitApplication(application, user, queue);
+        parent.submitApplication(applicationId, user, queue);
       } catch (AccessControlException ace) {
         LOG.info("Failed to submit application to parent-queue: " + 
             parent.getQueuePath(), ace);
-        removeApplication(application, user);
+        removeApplication(applicationId, user);
         throw ace;
       }
     }
   }
 
-  private synchronized void addApplication(FiCaSchedulerApp application, 
+
+  @Override
+  public void submitApplicationAttempt(FiCaSchedulerApp application,
+      String userName) {
+    // submit attempt logic.
+  }
+
+  @Override
+  public void finishApplicationAttempt(FiCaSchedulerApp application,
+      String queue) {
+    // finish attempt logic.
+  }
+
+  private synchronized void addApplication(ApplicationId applicationId,
       String user) {
-  
+
     ++numApplications;
 
     LOG.info("Application added -" +
-        " appId: " + application.getApplicationId() + 
+        " appId: " + applicationId + 
         " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());
   }
   
   @Override
-  public void finishApplication(FiCaSchedulerApp application, String queue) {
+  public void finishApplication(ApplicationId application, String user) {
     
     synchronized (this) {
-      removeApplication(application, application.getUser());
+      removeApplication(application, user);
     }
     
     // Inform the parent queue
     if (parent != null) {
-      parent.finishApplication(application, queue);
+      parent.finishApplication(application, user);
     }
   }
 
-  public synchronized void removeApplication(FiCaSchedulerApp application, 
+  public synchronized void removeApplication(ApplicationId applicationId, 
       String user) {
     
     --numApplications;
 
     LOG.info("Application removed -" +
-        " appId: " + application.getApplicationId() + 
+        " appId: " + applicationId + 
         " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Fri Jan  3 07:26:52 2014
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
@@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resou
  */
 @Private
 @Unstable
-public class FiCaSchedulerApp extends SchedulerApplication {
+public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Fri Jan  3 07:26:52 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -206,7 +206,7 @@ public class FiCaSchedulerNode extends S
   }
 
   public synchronized void reserveResource(
-      SchedulerApplication application, Priority priority, 
+      SchedulerApplicationAttempt application, Priority priority, 
       RMContainer reservedContainer) {
     // Check if it's already reserved
     if (this.reservedContainer != null) {
@@ -241,7 +241,7 @@ public class FiCaSchedulerNode extends S
   }
 
   public synchronized void unreserveResource(
-      SchedulerApplication application) {
+      SchedulerApplicationAttempt application) {
     
     // adding NP checks as this can now be called for preemption
     if (reservedContainer != null

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Fri Jan  3 07:26:52 2014
@@ -24,11 +24,15 @@ public enum SchedulerEventType {
   NODE_ADDED,
   NODE_REMOVED,
   NODE_UPDATE,
-  
-  // Source: App
+
+  // Source: RMApp
   APP_ADDED,
   APP_REMOVED,
 
+  // Source: RMAppAttempt
+  APP_ATTEMPT_ADDED,
+  APP_ATTEMPT_REMOVED,
+
   // Source: ContainerAllocationExpirer
   CONTAINER_EXPIRED
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Jan  3 07:26:52 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 
 @Private
 @Unstable

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Jan  3 07:26:52 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.resou
  */
 @Private
 @Unstable
-public class FSSchedulerApp extends SchedulerApplication {
+public class FSSchedulerApp extends SchedulerApplicationAttempt {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jan  3 07:26:52 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,10 +59,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -76,6 +80,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -151,10 +157,15 @@ public class FairScheduler implements Re
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-  // This stores per-application scheduling information, indexed by
+  // This stores per-application scheduling information,
+  @VisibleForTesting
+  protected Map<ApplicationId, SchedulerApplication> applications =
+      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+  // This stores per-application-attempt scheduling information, indexed by
   // attempt ID's for fast lookup.
   @VisibleForTesting
-  protected Map<ApplicationAttemptId, FSSchedulerApp> applications = 
+  protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts = 
       new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
 
   // Nodes in the cluster, indexed by NodeId
@@ -253,7 +264,7 @@ public class FairScheduler implements Re
 
   private RMContainer getRMContainer(ContainerId containerId) {
     FSSchedulerApp application = 
-        applications.get(containerId.getApplicationAttemptId());
+        appAttempts.get(containerId.getApplicationAttemptId());
     return (application == null) ? null : application.getRMContainer(containerId);
   }
 
@@ -591,44 +602,63 @@ public class FairScheduler implements Re
    * user. This will accept a new app even if the user or queue is above
    * configured limits, but the app will not be marked as runnable.
    */
-  protected synchronized void addApplication(
-      ApplicationAttemptId applicationAttemptId, String queueName, String user) {
+  protected synchronized void addApplication(ApplicationId applicationId,
+      String queueName, String user) {
     if (queueName == null || queueName.isEmpty()) {
-      String message = "Reject application " + applicationAttemptId +
+      String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
       LOG.info(message);
-      rmContext.getDispatcher().getEventHandler().handle(
-              new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, message));
       return;
     }
 
-    RMApp rmApp = rmContext.getRMApps().get(
-        applicationAttemptId.getApplicationId());
+    RMApp rmApp = rmContext.getRMApps().get(applicationId);
     FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
     if (queue == null) {
       rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId,
+          new RMAppRejectedEvent(applicationId,
               "Application rejected by queue placement policy"));
       return;
     }
 
-    FSSchedulerApp schedulerApp =
-        new FSSchedulerApp(applicationAttemptId, user,
-            queue, new ActiveUsersManager(getRootQueueMetrics()),
-            rmContext);
-
     // Enforce ACLs
     UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
 
     if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
         && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
       String msg = "User " + userUgi.getUserName() +
-    	        " cannot submit applications to queue " + queue.getName();
+              " cannot submit applications to queue " + queue.getName();
       LOG.info(msg);
-      rmContext.getDispatcher().getEventHandler().handle(
-    	        new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
+      rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, msg));
       return;
     }
+  
+    SchedulerApplication application =
+        new SchedulerApplication(queue, user);
+    applications.put(applicationId, application);
+
+    LOG.info("Accepted application " + applicationId + " from user: " + user
+        + ", in queue: " + queueName);
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+  }
+
+  /**
+   * Add a new application attempt to the scheduler.
+   */
+  protected synchronized void addApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    String user = application.getUser();
+    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+
+    FSSchedulerApp schedulerApp =
+        new FSSchedulerApp(applicationAttemptId, user,
+            queue, new ActiveUsersManager(getRootQueueMetrics()),
+            rmContext);
 
     boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
     queue.addApp(schedulerApp, runnable);
@@ -639,16 +669,14 @@ public class FairScheduler implements Re
     }
     
     queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
+    appAttempts.put(applicationAttemptId, schedulerApp);
 
-    applications.put(applicationAttemptId, schedulerApp);
-
-    LOG.info("Application Submission: " + applicationAttemptId +
-        ", user: "+ user +
-        ", currently active: " + applications.size());
-
+    LOG.info("Added Application Attempt " + applicationAttemptId
+        + " to scheduler from user: " + user + ", currently active: "
+        + appAttempts.size());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.APP_ACCEPTED));
+            RMAppAttemptEventType.ATTEMPT_ADDED));
   }
   
   @VisibleForTesting
@@ -674,13 +702,18 @@ public class FairScheduler implements Re
     return queue;
   }
 
-  private synchronized void removeApplication(
+  private synchronized void removeApplication(ApplicationId applicationId,
+      RMAppState finalState) {
+    applications.remove(applicationId);
+  }
+
+  private synchronized void removeApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState) {
     LOG.info("Application " + applicationAttemptId + " is done." +
         " finalState=" + rmAppAttemptFinalState);
 
-    FSSchedulerApp application = applications.get(applicationAttemptId);
+    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
 
     if (application == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@@ -720,7 +753,7 @@ public class FairScheduler implements Re
     }
     
     // Remove from our data-structure
-    applications.remove(applicationAttemptId);
+    appAttempts.remove(applicationAttemptId);
   }
 
   /**
@@ -737,7 +770,7 @@ public class FairScheduler implements Re
 
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
-    FSSchedulerApp application = applications.get(applicationAttemptId);
+    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
     if (application == null) {
       LOG.info("Container " + container + " of" +
           " unknown application " + applicationAttemptId +
@@ -811,7 +844,7 @@ public class FairScheduler implements Re
       List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
     // Make sure this application exists
-    FSSchedulerApp application = applications.get(appAttemptId);
+    FSSchedulerApp application = appAttempts.get(appAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + appAttemptId);
@@ -882,7 +915,7 @@ public class FairScheduler implements Re
   private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    FSSchedulerApp application = applications.get(applicationAttemptId);
+    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
     if (application == null) {
       LOG.info("Unknown application: " + applicationAttemptId +
           " launched container " + containerId +
@@ -1025,23 +1058,23 @@ public class FairScheduler implements Re
   }
   
   public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    return applications.get(appAttemptId);
+    return appAttempts.get(appAttemptId);
   }
   
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId appAttemptId) {
-    if (!applications.containsKey(appAttemptId)) {
+    if (!appAttempts.containsKey(appAttemptId)) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return new SchedulerAppReport(applications.get(appAttemptId));
+    return new SchedulerAppReport(appAttempts.get(appAttemptId));
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp app = applications.get(appAttemptId);
+    FSSchedulerApp app = appAttempts.get(appAttemptId);
     if (app == null) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
@@ -1094,18 +1127,34 @@ public class FairScheduler implements Re
       if (!(event instanceof AppAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
-      String queue = appAddedEvent.getQueue();
-      addApplication(appAddedEvent.getApplicationAttemptId(), queue,
-          appAddedEvent.getUser());
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      addApplication(appAddedEvent.getApplicationId(),
+        appAddedEvent.getQueue(), appAddedEvent.getUser());
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
-      removeApplication(appRemovedEvent.getApplicationAttemptID(),
-          appRemovedEvent.getFinalAttemptState());
+      removeApplication(appRemovedEvent.getApplicationID(),
+        appRemovedEvent.getFinalState());
+      break;
+    case APP_ATTEMPT_ADDED:
+      if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+          (AppAttemptAddedSchedulerEvent) event;
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+      break;
+    case APP_ATTEMPT_REMOVED:
+      if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          (AppAttemptRemovedSchedulerEvent) event;
+      removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
+        appAttemptRemovedEvent.getFinalAttemptState());
       break;
     case CONTAINER_EXPIRED:
       if (!(event instanceof ContainerExpiredSchedulerEvent)) {

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Jan  3 07:26:52 2014
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -38,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,6 +59,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -67,10 +70,22 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -108,11 +123,15 @@ public class FifoScheduler implements Re
   private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
+  @VisibleForTesting
+  protected Map<ApplicationId, SchedulerApplication> applications =
+      new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
+
   // Use ConcurrentSkipListMap because applications need to be ordered
   @VisibleForTesting
-  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
       = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
-  
+
   private ActiveUsersManager activeUsersManager;
 
   private static final String DEFAULT_QUEUE_NAME = "default";
@@ -319,7 +338,7 @@ public class FifoScheduler implements Re
   @VisibleForTesting
   FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
-    return applications.get(applicationAttemptId);
+    return appAttempts.get(applicationAttemptId);
   }
 
   @Override
@@ -339,23 +358,47 @@ public class FifoScheduler implements Re
   private FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
-  
-  private synchronized void addApplication(ApplicationAttemptId appAttemptId,
-      String user) {
+
+  private synchronized void addApplication(ApplicationId applicationId,
+      String queue, String user) {
+    SchedulerApplication application =
+        new SchedulerApplication(null, user);
+    applications.put(applicationId, application);
+    LOG.info("Accepted application " + applicationId + " from user: " + user);
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+  }
+
+  private synchronized void addApplicationAttempt(
+      ApplicationAttemptId appAttemptId) {
+    SchedulerApplication application =
+        applications.get(appAttemptId.getApplicationId());
+    String user = application.getUser();
     // TODO: Fix store
-    FiCaSchedulerApp schedulerApp = 
-        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
-            this.rmContext);
-    applications.put(appAttemptId, schedulerApp);
+    FiCaSchedulerApp schedulerApp =
+        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
+          activeUsersManager, this.rmContext);
+    appAttempts.put(appAttemptId, schedulerApp);
     metrics.submitApp(user, appAttemptId.getAttemptId());
-    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
-        " from " + user + ", currently active: " + applications.size());
+    LOG.info("Added Application Attempt " + appAttemptId
+        + " to scheduler from user " + application.getUser()
+        + ", currently active: " + appAttempts.size());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId,
-            RMAppAttemptEventType.APP_ACCEPTED));
+            RMAppAttemptEventType.ATTEMPT_ADDED));
+  }
+
+  private synchronized void doneApplication(ApplicationId applicationId,
+      RMAppState finalState) {
+    SchedulerApplication application = applications.get(applicationId);
+
+    // Inform the activeUsersManager
+    activeUsersManager.deactivateApplication(application.getUser(),
+      applicationId);
+    applications.remove(applicationId);
   }
 
-  private synchronized void doneApplication(
+  private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState)
       throws IOException {
@@ -374,17 +417,11 @@ public class FifoScheduler implements Re
           RMContainerEventType.KILL);
     }
 
-    // Inform the activeUsersManager
-    synchronized (application) {
-      activeUsersManager.deactivateApplication(
-          application.getUser(), application.getApplicationId());
-    }
-
     // Clean up pending requests, metrics etc.
     application.stop(rmAppAttemptFinalState);
 
     // Remove the application
-    applications.remove(applicationAttemptId);
+    appAttempts.remove(applicationAttemptId);
   }
   
   /**
@@ -395,10 +432,10 @@ public class FifoScheduler implements Re
   private void assignContainers(FiCaSchedulerNode node) {
     LOG.debug("assignContainers:" +
         " node=" + node.getRMNode().getNodeAddress() + 
-        " #applications=" + applications.size());
+        " #applications=" + appAttempts.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
+    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
         .entrySet()) {
       FiCaSchedulerApp application = e.getValue();
       LOG.debug("pre-assignContainers");
@@ -437,7 +474,7 @@ public class FifoScheduler implements Re
 
     // Update the applications' headroom to correctly take into
     // account the containers assigned in this update.
-    for (FiCaSchedulerApp application : applications.values()) {
+    for (FiCaSchedulerApp application : appAttempts.values()) {
       application.setHeadroom(Resources.subtract(clusterResource, usedResource));
     }
   }
@@ -692,19 +729,35 @@ public class FifoScheduler implements Re
     case APP_ADDED:
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
-          .getUser());
+      addApplication(appAddedEvent.getApplicationId(),
+        appAddedEvent.getQueue(), appAddedEvent.getUser());
     }
     break;
     case APP_REMOVED:
     {
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+      doneApplication(appRemovedEvent.getApplicationID(),
+        appRemovedEvent.getFinalState());
+    }
+    break;
+    case APP_ATTEMPT_ADDED:
+    {
+      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+          (AppAttemptAddedSchedulerEvent) event;
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+    }
+    break;
+    case APP_ATTEMPT_REMOVED:
+    {
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          (AppAttemptRemovedSchedulerEvent) event;
       try {
-        doneApplication(appRemovedEvent.getApplicationAttemptID(),
-            appRemovedEvent.getFinalAttemptState());
+        doneApplicationAttempt(
+          appAttemptRemovedEvent.getApplicationAttemptID(),
+          appAttemptRemovedEvent.getFinalAttemptState());
       } catch(IOException ie) {
         LOG.error("Unable to remove application "
-            + appRemovedEvent.getApplicationAttemptID(), ie);
+            + appAttemptRemovedEvent.getApplicationAttemptID(), ie);
       }
     }
     break;
@@ -856,8 +909,8 @@ public class FifoScheduler implements Re
   public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
     if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
       List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
-          applications.size());
-      for (FiCaSchedulerApp app : applications.values()) {
+          appAttempts.size());
+      for (FiCaSchedulerApp app : appAttempts.values()) {
         apps.add(app.getApplicationAttemptId());
       }
       return apps;

Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Fri Jan  3 07:26:52 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -164,11 +165,14 @@ public class Application {
     final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
     
     resourceManager.getClientRMService().submitApplication(request);
-    
+
     // Notify scheduler
-    AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
-            this.applicationAttemptId, this.queue, this.user);
-    scheduler.handle(appAddedEvent1);
+    AppAddedSchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
+    scheduler.handle(addAppEvent);
+    AppAttemptAddedSchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
+    scheduler.handle(addAttemptEvent);
   }
   
   public synchronized void addResourceRequestSpec(