You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/02 21:19:46 UTC

svn commit: r1554896 [1/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn...

Author: vinodkv
Date: Thu Jan  2 20:19:45 2014
New Revision: 1554896

URL: http://svn.apache.org/r1554896
Log:
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize app-attempts separately from apps. Contributed by Jian He.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java
Removed:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptRejectedEvent.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Jan  2 20:19:45 2014
@@ -194,6 +194,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port
     information once an AM crashes. (Jian He via vinodkv)
 
+    YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
+    app-attempts separately from apps. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Thu Jan  2 20:19:45 2014
@@ -24,9 +24,11 @@ public enum RMAppEventType {
   RECOVER,
   KILL,
 
-  // Source: RMAppAttempt
+  // Source: Scheduler
   APP_REJECTED,
   APP_ACCEPTED,
+
+  // Source: RMAppAttempt
   ATTEMPT_REGISTERED,
   ATTEMPT_UNREGISTERED,
   ATTEMPT_FINISHED, // Will send the final state

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Thu Jan  2 20:19:45 2014
@@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -136,7 +138,7 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppNewlySavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
-            RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
+            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
             RMAppState.KILLED, RMAppState.FINAL_SAVING),
         RMAppEventType.RECOVER, new RMAppRecoveredTransition())
     .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
@@ -151,7 +153,7 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
-        RMAppEventType.APP_NEW_SAVED, new StartAppAttemptTransition())
+        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
         RMAppEventType.KILL,
         new FinalSavingTransition(
@@ -169,9 +171,12 @@ public class RMAppImpl implements RMApp,
         new FinalSavingTransition(
           new AppRejectedTransition(), RMAppState.FAILED))
     .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
-        RMAppEventType.APP_ACCEPTED)
-    .addTransition(RMAppState.SUBMITTED, RMAppState.KILLING,
-        RMAppEventType.KILL,new KillAttemptTransition())
+        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+
 
      // Transitions from ACCEPTED state
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@@ -179,11 +184,22 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
+        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
+        // ACCEPTED state is possible to receive ATTEMPT_FAILED event because
+        // RMAppRecoveredTransition is returning ACCEPTED state directly and
+        // waiting for the previous AM to exit.
         RMAppEventType.ATTEMPT_FAILED,
-        new AttemptFailedTransition(RMAppState.SUBMITTED))
-    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
-        RMAppEventType.KILL,new KillAttemptTransition())
+        new AttemptFailedTransition(RMAppState.ACCEPTED))
+    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+        RMAppEventType.KILL,
+        new FinalSavingTransition(
+          new AppKilledTransition(), RMAppState.KILLED))
+    // ACCECPTED state can once again receive APP_ACCEPTED event, because on
+    // recovery the app returns ACCEPTED state and the app once again go
+    // through the scheduler and triggers one more APP_ACCEPTED event at
+    // ACCEPTED state.
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+        RMAppEventType.APP_ACCEPTED)
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -197,9 +213,9 @@ public class RMAppImpl implements RMApp,
       // UnManagedAM directly jumps to finished
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
     .addTransition(RMAppState.RUNNING,
-        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
+        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
         RMAppEventType.ATTEMPT_FAILED,
-        new AttemptFailedTransition(RMAppState.SUBMITTED))
+        new AttemptFailedTransition(RMAppState.ACCEPTED))
     .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
         RMAppEventType.KILL, new KillAttemptTransition())
 
@@ -641,7 +657,7 @@ public class RMAppImpl implements RMApp,
         ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf, user);
+          submissionContext, conf);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
     if(startAttempt) {
@@ -695,29 +711,46 @@ public class RMAppImpl implements RMApp,
         return app.recoveredFinalState;
       }
 
+      // Notify scheduler about the app on recovery
+      new AddApplicationToSchedulerTransition().transition(app, event);
+
       // No existent attempts means the attempt associated with this app was not
       // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
-        app.createNewAttempt(true);
         return RMAppState.SUBMITTED;
       }
 
-      return RMAppState.RUNNING;
+      // YARN-1507 is saving the application state after the application is
+      // accepted. So after YARN-1507, an app is saved meaning it is accepted.
+      // Thus we return ACCECPTED state on recovery.
+      return RMAppState.ACCEPTED;
     }
   }
 
-  private static final class StartAppAttemptTransition extends RMAppTransition {
+  private static final class AddApplicationToSchedulerTransition extends
+      RMAppTransition {
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
-      if (storeEvent.getStoredException() != null) {
+      if (event instanceof RMAppNewSavedEvent) {
+        RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event;
         // For HA this exception needs to be handled by giving up
         // master status if we got fenced
-        LOG.error(
-          "Failed to store application: " + storeEvent.getApplicationId(),
-          storeEvent.getStoredException());
-        ExitUtil.terminate(1, storeEvent.getStoredException());
+        if (((RMAppNewSavedEvent) event).getStoredException() != null) {
+          LOG.error(
+            "Failed to store application: " + storeEvent.getApplicationId(),
+            storeEvent.getStoredException());
+          ExitUtil.terminate(1, storeEvent.getStoredException());
+        }
       }
+      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
+        app.submissionContext.getQueue(), app.user));
+    }
+  }
+
+  private static final class StartAppAttemptTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
       app.createNewAttempt(true);
     };
   }
@@ -965,6 +998,8 @@ public class RMAppImpl implements RMApp,
       if (app.finishTime == 0 ) {
         app.finishTime = System.currentTimeMillis();
       }
+      app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, app
+        .getState()));
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -993,7 +1028,6 @@ public class RMAppImpl implements RMApp,
         return RMAppState.FINAL_SAVING;
       }
     }
-
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Thu Jan  2 20:19:45 2014
@@ -45,8 +45,7 @@ public enum RMAppAttemptEventType {
   ATTEMPT_UPDATE_SAVED,
 
   // Source: Scheduler
-  APP_REJECTED,
-  APP_ACCEPTED,
+  ATTEMPT_ADDED,
   
   // Source: RMAttemptImpl.recover
   RECOVER

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Thu Jan  2 20:19:45 2014
@@ -75,13 +75,11 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
@@ -150,7 +148,6 @@ public class RMAppAttemptImpl implements
   private final StringBuilder diagnostics = new StringBuilder();
 
   private Configuration conf;
-  private String user;
   
   private static final ExpiredTransition EXPIRED_TRANSITION =
       new ExpiredTransition();
@@ -186,14 +183,10 @@ public class RMAppAttemptImpl implements
           RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
           
       // Transitions from SUBMITTED state
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
-          RMAppAttemptEventType.APP_REJECTED,
-          new FinalSavingTransition(new AppRejectedTransition(),
-            RMAppAttemptState.FAILED))
       .addTransition(RMAppAttemptState.SUBMITTED, 
           EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                      RMAppAttemptState.SCHEDULED),
-          RMAppAttemptEventType.APP_ACCEPTED, 
+          RMAppAttemptEventType.ATTEMPT_ADDED,
           new ScheduleTransition())
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.KILL,
@@ -380,8 +373,7 @@ public class RMAppAttemptImpl implements
       .addTransition(
           RMAppAttemptState.KILLED,
           RMAppAttemptState.KILLED,
-          EnumSet.of(RMAppAttemptEventType.APP_ACCEPTED,
-              RMAppAttemptEventType.APP_REJECTED,
+          EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED,
               RMAppAttemptEventType.EXPIRE,
               RMAppAttemptEventType.LAUNCHED,
               RMAppAttemptEventType.LAUNCH_FAILED,
@@ -398,7 +390,7 @@ public class RMAppAttemptImpl implements
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, String user) {
+      Configuration conf) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -414,7 +406,6 @@ public class RMAppAttemptImpl implements
     this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
     
     this.stateMachine = stateMachineFactory.make(this);
-    this.user = user;
   }
 
   @Override
@@ -750,35 +741,8 @@ public class RMAppAttemptImpl implements
             appAttempt.rmContext.getAMRMTokenSecretManager());
 
       // Add the applicationAttempt to the scheduler
-      appAttempt.eventHandler.handle(
-          new AppAttemptAddedSchedulerEvent(appAttempt.applicationAttemptId,
-              appAttempt.submissionContext.getQueue(), appAttempt.user));
-    }
-  }
-
-  private static final class AppRejectedTransition extends BaseTransition {
-    @Override
-    public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-
-      RMAppAttemptRejectedEvent rejectedEvent = (RMAppAttemptRejectedEvent) event;
-
-      // Tell the AMS. Unregister from the ApplicationMasterService
-      appAttempt.masterService
-          .unregisterAttempt(appAttempt.applicationAttemptId);
-      
-      // Save the diagnostic message
-      String message = rejectedEvent.getMessage();
-      appAttempt.diagnostics.append(message);
-
-      // Send the rejection event to app
-      appAttempt.eventHandler.handle(
-          new RMAppRejectedEvent(
-              rejectedEvent.getApplicationAttemptId().getApplicationId(),
-              message)
-          );
-
-      appAttempt.removeCredentials(appAttempt);
+      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
+        appAttempt.applicationAttemptId));
     }
   }
 
@@ -794,11 +758,6 @@ public class RMAppAttemptImpl implements
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
       if (!appAttempt.submissionContext.getUnmanagedAM()) {
-        // Send the acceptance to the app
-        appAttempt.eventHandler.handle(new RMAppEvent(event
-            .getApplicationAttemptId().getApplicationId(),
-            RMAppEventType.APP_ACCEPTED));
-
         // Request a container for the AM.
         ResourceRequest request =
             BuilderUtils.newResourceRequest(
@@ -918,11 +877,6 @@ public class RMAppAttemptImpl implements
     FinalApplicationStatus finalStatus = null;
 
     switch (event.getType()) {
-    case APP_REJECTED:
-      RMAppAttemptRejectedEvent rejectedEvent =
-          (RMAppAttemptRejectedEvent) event;
-      diags = rejectedEvent.getMessage();
-      break;
     case LAUNCH_FAILED:
       RMAppAttemptLaunchFailedEvent launchFaileEvent =
           (RMAppAttemptLaunchFailedEvent) event;
@@ -1091,16 +1045,6 @@ public class RMAppAttemptImpl implements
     public void transition(RMAppAttemptImpl appAttempt,
                             RMAppAttemptEvent event) {
       appAttempt.checkAttemptStoreError(event);
-      // Send the acceptance to the app
-      // Ideally this should have been done when the scheduler accepted the app.
-      // But its here because until the attempt is saved the client should not
-      // launch the unmanaged AM. Client waits for the app status to be accepted
-      // before doing so. So we have to delay the accepted state until we have 
-      // completed storing the attempt
-      appAttempt.eventHandler.handle(new RMAppEvent(event
-          .getApplicationAttemptId().getApplicationId(),
-          RMAppEventType.APP_ACCEPTED));
-      
       super.transition(appAttempt, event);
     }    
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java Thu Jan  2 20:19:45 2014
@@ -56,7 +56,7 @@ public class ActiveUsersManager {
    * @param user application user 
    * @param applicationId activated application
    */
-  @Lock({Queue.class, SchedulerApplication.class})
+  @Lock({Queue.class, SchedulerApplicationAttempt.class})
   synchronized public void activateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -79,7 +79,7 @@ public class ActiveUsersManager {
    * @param user application user 
    * @param applicationId deactivated application
    */
-  @Lock({Queue.class, SchedulerApplication.class})
+  @Lock({Queue.class, SchedulerApplicationAttempt.class})
   synchronized public void deactivateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -102,7 +102,7 @@ public class ActiveUsersManager {
    * resource requests.
    * @return number of active users
    */
-  @Lock({Queue.class, SchedulerApplication.class})
+  @Lock({Queue.class, SchedulerApplicationAttempt.class})
   synchronized public int getNumActiveUsers() {
     return activeUsers;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Thu Jan  2 20:19:45 2014
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
   private final Collection<RMContainer> reserved;
   private final boolean pending;
   
-  public SchedulerAppReport(SchedulerApplication app) {
+  public SchedulerAppReport(SchedulerApplicationAttempt app) {
     this.live = app.getLiveContainers();
     this.reserved = app.getReservedContainers();
     this.pending = app.isPending();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java Thu Jan  2 20:19:45 2014
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 
 public class SchedulerAppUtils {
 
-  public static  boolean isBlacklisted(SchedulerApplication application,
+  public static  boolean isBlacklisted(SchedulerApplicationAttempt application,
       SchedulerNode node, Log LOG) {
     if (application.isBlacklisted(node.getNodeName())) {
       if (LOG.isDebugEnabled()) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Thu Jan  2 20:19:45 2014
@@ -17,393 +17,26 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
 
-/**
- * Represents an application attempt from the viewpoint of the scheduler.
- * Each running app attempt in the RM corresponds to one instance
- * of this class.
- */
 @Private
 @Unstable
-public abstract class SchedulerApplication {
-  
-  private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
-
-  protected final AppSchedulingInfo appSchedulingInfo;
-  
-  protected final Map<ContainerId, RMContainer> liveContainers =
-      new HashMap<ContainerId, RMContainer>();
-  protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
-      new HashMap<Priority, Map<NodeId, RMContainer>>();
-
-  private final Multiset<Priority> reReservations = HashMultiset.create();
-  
-  protected final Resource currentReservation = Resource.newInstance(0, 0);
-  private Resource resourceLimit = Resource.newInstance(0, 0);
-  protected final Resource currentConsumption = Resource.newInstance(0, 0);
+public class SchedulerApplication {
 
-  protected List<RMContainer> newlyAllocatedContainers = 
-      new ArrayList<RMContainer>();
+  private final Queue queue;
+  private final String user;
 
-  /**
-   * Count how many times the application has been given an opportunity
-   * to schedule a task at each priority. Each time the scheduler
-   * asks the application for a task at this priority, it is incremented,
-   * and each time the application successfully schedules a task, it
-   * is reset to 0.
-   */
-  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
-  
-  // Time of the last container scheduled at the current allowed level
-  protected Map<Priority, Long> lastScheduledContainer =
-      new HashMap<Priority, Long>();
-
-  protected final Queue queue;
-  protected boolean isStopped = false;
-  
-  protected final RMContext rmContext;
-  
-  public SchedulerApplication(ApplicationAttemptId applicationAttemptId, 
-      String user, Queue queue, ActiveUsersManager activeUsersManager,
-      RMContext rmContext) {
-    this.rmContext = rmContext;
-    this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager);
+  public SchedulerApplication(Queue queue, String user) {
     this.queue = queue;
-  }
-  
-  /**
-   * Get the live containers of the application.
-   * @return live containers of the application
-   */
-  public synchronized Collection<RMContainer> getLiveContainers() {
-    return new ArrayList<RMContainer>(liveContainers.values());
-  }
-  
-  /**
-   * Is this application pending?
-   * @return true if it is else false.
-   */
-  public boolean isPending() {
-    return appSchedulingInfo.isPending();
-  }
-  
-  /**
-   * Get {@link ApplicationAttemptId} of the application master.
-   * @return <code>ApplicationAttemptId</code> of the application master
-   */
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return appSchedulingInfo.getApplicationAttemptId();
-  }
-  
-  public ApplicationId getApplicationId() {
-    return appSchedulingInfo.getApplicationId();
-  }
-  
-  public String getUser() {
-    return appSchedulingInfo.getUser();
-  }
-
-  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
-    return appSchedulingInfo.getResourceRequests(priority);
-  }
-
-  public int getNewContainerId() {
-    return appSchedulingInfo.getNewContainerId();
-  }
-
-  public Collection<Priority> getPriorities() {
-    return appSchedulingInfo.getPriorities();
-  }
-  
-  public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
-    return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
-  }
-
-  public synchronized int getTotalRequiredResources(Priority priority) {
-    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
-  }
-
-  public Resource getResource(Priority priority) {
-    return appSchedulingInfo.getResource(priority);
-  }
-
-  public String getQueueName() {
-    return appSchedulingInfo.getQueueName();
-  }
-  
-  public synchronized RMContainer getRMContainer(ContainerId id) {
-    return liveContainers.get(id);
-  }
-
-  protected synchronized void resetReReservations(Priority priority) {
-    reReservations.setCount(priority, 0);
-  }
-
-  protected synchronized void addReReservation(Priority priority) {
-    reReservations.add(priority);
-  }
-
-  public synchronized int getReReservations(Priority priority) {
-    return reReservations.count(priority);
+    this.user = user;
   }
 
-  /**
-   * Get total current reservations.
-   * Used only by unit tests
-   * @return total current reservations
-   */
-  @Stable
-  @Private
-  public synchronized Resource getCurrentReservation() {
-    return currentReservation;
-  }
-  
   public Queue getQueue() {
     return queue;
   }
-  
-  public synchronized void updateResourceRequests(
-      List<ResourceRequest> requests) {
-    if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
-    }
-  }
-  
-  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
-    // Cleanup all scheduling information
-    isStopped = true;
-    appSchedulingInfo.stop(rmAppAttemptFinalState);
-  }
-
-  public synchronized boolean isStopped() {
-    return isStopped;
-  }
-
-  /**
-   * Get the list of reserved containers
-   * @return All of the reserved containers.
-   */
-  public synchronized List<RMContainer> getReservedContainers() {
-    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
-      this.reservedContainers.entrySet()) {
-      reservedContainers.addAll(e.getValue().values());
-    }
-    return reservedContainers;
-  }
-  
-  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
-      RMContainer rmContainer, Container container) {
-    // Create RMContainer if necessary
-    if (rmContainer == null) {
-      rmContainer = 
-          new RMContainerImpl(container, getApplicationAttemptId(), 
-              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
-              rmContext.getContainerAllocationExpirer());
-        
-      Resources.addTo(currentReservation, container.getResource());
-      
-      // Reset the re-reservation count
-      resetReReservations(priority);
-    } else {
-      // Note down the re-reservation
-      addReReservation(priority);
-    }
-    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
-        container.getResource(), node.getNodeID(), priority));
-    
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers == null) {
-      reservedContainers = new HashMap<NodeId, RMContainer>();
-      this.reservedContainers.put(priority, reservedContainers);
-    }
-    reservedContainers.put(node.getNodeID(), rmContainer);
-    
-    LOG.info("Application " + getApplicationId() 
-        + " reserved container " + rmContainer
-        + " on node " + node + ", currently has " + reservedContainers.size()
-        + " at priority " + priority 
-        + "; currentReservation " + currentReservation.getMemory());
-    
-    return rmContainer;
-  }
-  
-  /**
-   * Has the application reserved the given <code>node</code> at the
-   * given <code>priority</code>?
-   * @param node node to be checked
-   * @param priority priority of reserved container
-   * @return true is reserved, false if not
-   */
-  public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    if (reservedContainers != null) {
-      return reservedContainers.containsKey(node.getNodeID());
-    }
-    return false;
-  }
-  
-  public synchronized void setHeadroom(Resource globalLimit) {
-    this.resourceLimit = globalLimit; 
-  }
-
-  /**
-   * Get available headroom in terms of resources for the application's user.
-   * @return available resource headroom
-   */
-  public synchronized Resource getHeadroom() {
-    // Corner case to deal with applications being slightly over-limit
-    if (resourceLimit.getMemory() < 0) {
-      resourceLimit.setMemory(0);
-    }
-    
-    return resourceLimit;
-  }
-  
-  public synchronized int getNumReservedContainers(Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    return (reservedContainers == null) ? 0 : reservedContainers.size();
-  }
-  
-  @SuppressWarnings("unchecked")
-  public synchronized void containerLaunchedOnNode(ContainerId containerId,
-      NodeId nodeId) {
-    // Inform the container
-    RMContainer rmContainer = getRMContainer(containerId);
-    if (rmContainer == null) {
-      // Some unknown container sneaked into the system. Kill it.
-      rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
-      return;
-    }
-
-    rmContainer.handle(new RMContainerEvent(containerId,
-        RMContainerEventType.LAUNCHED));
-  }
-  
-  public synchronized void showRequests() {
-    if (LOG.isDebugEnabled()) {
-      for (Priority priority : getPriorities()) {
-        Map<String, ResourceRequest> requests = getResourceRequests(priority);
-        if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " headRoom=" + getHeadroom() + 
-              " currentConsumption=" + currentConsumption.getMemory());
-          for (ResourceRequest request : requests.values()) {
-            LOG.debug("showRequests:" + " application=" + getApplicationId()
-                + " request=" + request);
-          }
-        }
-      }
-    }
-  }
-  
-  public Resource getCurrentConsumption() {
-    return currentConsumption;
-  }
-
-  public synchronized List<Container> pullNewlyAllocatedContainers() {
-    List<Container> returnContainerList = new ArrayList<Container>(
-        newlyAllocatedContainers.size());
-    for (RMContainer rmContainer : newlyAllocatedContainers) {
-      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
-          RMContainerEventType.ACQUIRED));
-      returnContainerList.add(rmContainer.getContainer());
-    }
-    newlyAllocatedContainers.clear();
-    return returnContainerList;
-  }
 
-  public synchronized void updateBlacklist(
-      List<String> blacklistAdditions, List<String> blacklistRemovals) {
-    if (!isStopped) {
-      this.appSchedulingInfo.updateBlacklist(
-          blacklistAdditions, blacklistRemovals);
-    }
-  }
-  
-  public boolean isBlacklisted(String resourceName) {
-    return this.appSchedulingInfo.isBlacklisted(resourceName);
-  }
-
-  public synchronized void addSchedulingOpportunity(Priority priority) {
-    schedulingOpportunities.setCount(priority,
-        schedulingOpportunities.count(priority) + 1);
-  }
-  
-  public synchronized void subtractSchedulingOpportunity(Priority priority) {
-    int count = schedulingOpportunities.count(priority) - 1;
-    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
-  }
-
-  /**
-   * Return the number of times the application has been given an opportunity
-   * to schedule a task at the given priority since the last time it
-   * successfully did so.
-   */
-  public synchronized int getSchedulingOpportunities(Priority priority) {
-    return schedulingOpportunities.count(priority);
-  }
-  
-  /**
-   * Should be called when an application has successfully scheduled a container,
-   * or when the scheduling locality threshold is relaxed.
-   * Reset various internal counters which affect delay scheduling
-   *
-   * @param priority The priority of the container scheduled.
-   */
-  public synchronized void resetSchedulingOpportunities(Priority priority) {
-    resetSchedulingOpportunities(priority, System.currentTimeMillis());
-  }
-  // used for continuous scheduling
-  public synchronized void resetSchedulingOpportunities(Priority priority,
-      long currentTimeMs) {
-    lastScheduledContainer.put(priority, currentTimeMs);
-    schedulingOpportunities.setCount(priority, 0);
-  }
-  
-  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
-    return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
-        reservedContainers.size(), Resources.clone(currentConsumption),
-        Resources.clone(currentReservation),
-        Resources.add(currentConsumption, currentReservation));
+  public String getUser() {
+    return user;
   }
-
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1554896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Thu Jan  2 20:19:45 2014
@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+
+/**
+ * Represents an application attempt from the viewpoint of the scheduler.
+ * Each running app attempt in the RM corresponds to one instance
+ * of this class.
+ */
+@Private
+@Unstable
+public abstract class SchedulerApplicationAttempt {
+  
+  private static final Log LOG = LogFactory
+    .getLog(SchedulerApplicationAttempt.class);
+
+  protected final AppSchedulingInfo appSchedulingInfo;
+  
+  protected final Map<ContainerId, RMContainer> liveContainers =
+      new HashMap<ContainerId, RMContainer>();
+  protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
+      new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+  private final Multiset<Priority> reReservations = HashMultiset.create();
+  
+  protected final Resource currentReservation = Resource.newInstance(0, 0);
+  private Resource resourceLimit = Resource.newInstance(0, 0);
+  protected final Resource currentConsumption = Resource.newInstance(0, 0);
+
+  protected List<RMContainer> newlyAllocatedContainers = 
+      new ArrayList<RMContainer>();
+
+  /**
+   * Count how many times the application has been given an opportunity
+   * to schedule a task at each priority. Each time the scheduler
+   * asks the application for a task at this priority, it is incremented,
+   * and each time the application successfully schedules a task, it
+   * is reset to 0.
+   */
+  Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+  
+  // Time of the last container scheduled at the current allowed level
+  protected Map<Priority, Long> lastScheduledContainer =
+      new HashMap<Priority, Long>();
+
+  protected final Queue queue;
+  protected boolean isStopped = false;
+  
+  protected final RMContext rmContext;
+  
+  public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 
+      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      RMContext rmContext) {
+    this.rmContext = rmContext;
+    this.appSchedulingInfo = 
+        new AppSchedulingInfo(applicationAttemptId, user, queue,  
+            activeUsersManager);
+    this.queue = queue;
+  }
+  
+  /**
+   * Get the live containers of the application.
+   * @return live containers of the application
+   */
+  public synchronized Collection<RMContainer> getLiveContainers() {
+    return new ArrayList<RMContainer>(liveContainers.values());
+  }
+  
+  /**
+   * Is this application pending?
+   * @return true if it is else false.
+   */
+  public boolean isPending() {
+    return appSchedulingInfo.isPending();
+  }
+  
+  /**
+   * Get {@link ApplicationAttemptId} of the application master.
+   * @return <code>ApplicationAttemptId</code> of the application master
+   */
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return appSchedulingInfo.getApplicationAttemptId();
+  }
+  
+  public ApplicationId getApplicationId() {
+    return appSchedulingInfo.getApplicationId();
+  }
+  
+  public String getUser() {
+    return appSchedulingInfo.getUser();
+  }
+
+  public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+    return appSchedulingInfo.getResourceRequests(priority);
+  }
+
+  public int getNewContainerId() {
+    return appSchedulingInfo.getNewContainerId();
+  }
+
+  public Collection<Priority> getPriorities() {
+    return appSchedulingInfo.getPriorities();
+  }
+  
+  public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
+    return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
+  }
+
+  public synchronized int getTotalRequiredResources(Priority priority) {
+    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
+  }
+
+  public Resource getResource(Priority priority) {
+    return appSchedulingInfo.getResource(priority);
+  }
+
+  public String getQueueName() {
+    return appSchedulingInfo.getQueueName();
+  }
+  
+  public synchronized RMContainer getRMContainer(ContainerId id) {
+    return liveContainers.get(id);
+  }
+
+  protected synchronized void resetReReservations(Priority priority) {
+    reReservations.setCount(priority, 0);
+  }
+
+  protected synchronized void addReReservation(Priority priority) {
+    reReservations.add(priority);
+  }
+
+  public synchronized int getReReservations(Priority priority) {
+    return reReservations.count(priority);
+  }
+
+  /**
+   * Get total current reservations.
+   * Used only by unit tests
+   * @return total current reservations
+   */
+  @Stable
+  @Private
+  public synchronized Resource getCurrentReservation() {
+    return currentReservation;
+  }
+  
+  public Queue getQueue() {
+    return queue;
+  }
+  
+  public synchronized void updateResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests);
+    }
+  }
+  
+  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+    // Cleanup all scheduling information
+    isStopped = true;
+    appSchedulingInfo.stop(rmAppAttemptFinalState);
+  }
+
+  public synchronized boolean isStopped() {
+    return isStopped;
+  }
+
+  /**
+   * Get the list of reserved containers
+   * @return All of the reserved containers.
+   */
+  public synchronized List<RMContainer> getReservedContainers() {
+    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
+      this.reservedContainers.entrySet()) {
+      reservedContainers.addAll(e.getValue().values());
+    }
+    return reservedContainers;
+  }
+  
+  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+      RMContainer rmContainer, Container container) {
+    // Create RMContainer if necessary
+    if (rmContainer == null) {
+      rmContainer = 
+          new RMContainerImpl(container, getApplicationAttemptId(), 
+              node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
+              rmContext.getContainerAllocationExpirer());
+        
+      Resources.addTo(currentReservation, container.getResource());
+      
+      // Reset the re-reservation count
+      resetReReservations(priority);
+    } else {
+      // Note down the re-reservation
+      addReReservation(priority);
+    }
+    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
+        container.getResource(), node.getNodeID(), priority));
+    
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers == null) {
+      reservedContainers = new HashMap<NodeId, RMContainer>();
+      this.reservedContainers.put(priority, reservedContainers);
+    }
+    reservedContainers.put(node.getNodeID(), rmContainer);
+    
+    LOG.info("Application " + getApplicationId() 
+        + " reserved container " + rmContainer
+        + " on node " + node + ", currently has " + reservedContainers.size()
+        + " at priority " + priority 
+        + "; currentReservation " + currentReservation.getMemory());
+    
+    return rmContainer;
+  }
+  
+  /**
+   * Has the application reserved the given <code>node</code> at the
+   * given <code>priority</code>?
+   * @param node node to be checked
+   * @param priority priority of reserved container
+   * @return true is reserved, false if not
+   */
+  public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers != null) {
+      return reservedContainers.containsKey(node.getNodeID());
+    }
+    return false;
+  }
+  
+  public synchronized void setHeadroom(Resource globalLimit) {
+    this.resourceLimit = globalLimit; 
+  }
+
+  /**
+   * Get available headroom in terms of resources for the application's user.
+   * @return available resource headroom
+   */
+  public synchronized Resource getHeadroom() {
+    // Corner case to deal with applications being slightly over-limit
+    if (resourceLimit.getMemory() < 0) {
+      resourceLimit.setMemory(0);
+    }
+    
+    return resourceLimit;
+  }
+  
+  public synchronized int getNumReservedContainers(Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    return (reservedContainers == null) ? 0 : reservedContainers.size();
+  }
+  
+  @SuppressWarnings("unchecked")
+  public synchronized void containerLaunchedOnNode(ContainerId containerId,
+      NodeId nodeId) {
+    // Inform the container
+    RMContainer rmContainer = getRMContainer(containerId);
+    if (rmContainer == null) {
+      // Some unknown container sneaked into the system. Kill it.
+      rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+      return;
+    }
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.LAUNCHED));
+  }
+  
+  public synchronized void showRequests() {
+    if (LOG.isDebugEnabled()) {
+      for (Priority priority : getPriorities()) {
+        Map<String, ResourceRequest> requests = getResourceRequests(priority);
+        if (requests != null) {
+          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
+              " headRoom=" + getHeadroom() + 
+              " currentConsumption=" + currentConsumption.getMemory());
+          for (ResourceRequest request : requests.values()) {
+            LOG.debug("showRequests:" + " application=" + getApplicationId()
+                + " request=" + request);
+          }
+        }
+      }
+    }
+  }
+  
+  public Resource getCurrentConsumption() {
+    return currentConsumption;
+  }
+
+  public synchronized List<Container> pullNewlyAllocatedContainers() {
+    List<Container> returnContainerList = new ArrayList<Container>(
+        newlyAllocatedContainers.size());
+    for (RMContainer rmContainer : newlyAllocatedContainers) {
+      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+          RMContainerEventType.ACQUIRED));
+      returnContainerList.add(rmContainer.getContainer());
+    }
+    newlyAllocatedContainers.clear();
+    return returnContainerList;
+  }
+
+  public synchronized void updateBlacklist(
+      List<String> blacklistAdditions, List<String> blacklistRemovals) {
+    if (!isStopped) {
+      this.appSchedulingInfo.updateBlacklist(
+          blacklistAdditions, blacklistRemovals);
+    }
+  }
+  
+  public boolean isBlacklisted(String resourceName) {
+    return this.appSchedulingInfo.isBlacklisted(resourceName);
+  }
+
+  public synchronized void addSchedulingOpportunity(Priority priority) {
+    schedulingOpportunities.setCount(priority,
+        schedulingOpportunities.count(priority) + 1);
+  }
+  
+  public synchronized void subtractSchedulingOpportunity(Priority priority) {
+    int count = schedulingOpportunities.count(priority) - 1;
+    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
+  }
+
+  /**
+   * Return the number of times the application has been given an opportunity
+   * to schedule a task at the given priority since the last time it
+   * successfully did so.
+   */
+  public synchronized int getSchedulingOpportunities(Priority priority) {
+    return schedulingOpportunities.count(priority);
+  }
+  
+  /**
+   * Should be called when an application has successfully scheduled a container,
+   * or when the scheduling locality threshold is relaxed.
+   * Reset various internal counters which affect delay scheduling
+   *
+   * @param priority The priority of the container scheduled.
+   */
+  public synchronized void resetSchedulingOpportunities(Priority priority) {
+    resetSchedulingOpportunities(priority, System.currentTimeMillis());
+  }
+  // used for continuous scheduling
+  public synchronized void resetSchedulingOpportunities(Priority priority,
+      long currentTimeMs) {
+    lastScheduledContainer.put(priority, currentTimeMs);
+    schedulingOpportunities.setCount(priority, 0);
+  }
+  
+  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
+    return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
+        reservedContainers.size(), Resources.clone(currentConsumption),
+        Resources.clone(currentReservation),
+        Resources.add(currentConsumption, currentReservation));
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Thu Jan  2 20:19:45 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -35,7 +36,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
@@ -155,21 +155,32 @@ extends org.apache.hadoop.yarn.server.re
   
   /**
    * Submit a new application to the queue.
-   * @param application application being submitted
+   * @param applicationId the applicationId of the application being submitted
    * @param user user who submitted the application
    * @param queue queue to which the application is submitted
    */
-  public void submitApplication(FiCaSchedulerApp application, String user, 
-      String queue) 
-  throws AccessControlException;
-  
+  public void submitApplication(ApplicationId applicationId, String user,
+      String queue) throws AccessControlException;
+
+  /**
+   * Submit an application attempt to the queue.
+   */
+  public void submitApplicationAttempt(FiCaSchedulerApp application,
+      String userName);
+
   /**
    * An application submitted to this queue has finished.
-   * @param application
-   * @param queue application queue 
+   * @param applicationId
+   * @param user user who submitted the application
    */
-  public void finishApplication(FiCaSchedulerApp application, String queue);
-  
+  public void finishApplication(ApplicationId applicationId, String user);
+
+  /**
+   * An application attempt submitted to this queue has finished.
+   */
+  public void finishApplicationAttempt(FiCaSchedulerApp application,
+      String queue);
+
   /**
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Thu Jan  2 20:19:45 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,10 +54,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -65,14 +69,16 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -185,7 +191,11 @@ public class CapacityScheduler
   private Resource maximumAllocation;
 
   @VisibleForTesting
-  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications = 
+  protected Map<ApplicationId, SchedulerApplication> applications =
+      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+  @VisibleForTesting
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts = 
       new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
 
   private boolean initialized = false;
@@ -415,61 +425,84 @@ public class CapacityScheduler
   synchronized CSQueue getQueue(String queueName) {
     return queues.get(queueName);
   }
-  
-  private synchronized void
-      addApplicationAttempt(ApplicationAttemptId applicationAttemptId,
-          String queueName, String user) {
 
-    // Sanity checks
+  private synchronized void addApplication(ApplicationId applicationId,
+      String queueName, String user) {
+    // santiy checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
-      String message = "Application " + applicationAttemptId + 
+      String message = "Application " + applicationId + 
       " submitted by user " + user + " to unknown queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, message));
       return;
     }
     if (!(queue instanceof LeafQueue)) {
-      String message = "Application " + applicationAttemptId + 
+      String message = "Application " + applicationId + 
           " submitted by user " + user + " to non-leaf queue: " + queueName;
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, message));
       return;
     }
-
-    // TODO: Fix store
-    FiCaSchedulerApp SchedulerApp = 
-        new FiCaSchedulerApp(applicationAttemptId, user, queue, 
-            queue.getActiveUsersManager(), rmContext);
-
     // Submit to the queue
     try {
-      queue.submitApplication(SchedulerApp, user, queueName);
+      queue.submitApplication(applicationId, user, queueName);
     } catch (AccessControlException ace) {
-      LOG.info("Failed to submit application " + applicationAttemptId + 
-          " to queue " + queueName + " from user " + user, ace);
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, 
-              ace.toString()));
+      LOG.info("Failed to submit application " + applicationId + " to queue "
+          + queueName + " from user " + user, ace);
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
       return;
     }
+    SchedulerApplication application =
+        new SchedulerApplication(queue, user);
+    applications.put(applicationId, application);
+    LOG.info("Accepted application " + applicationId + " from user: " + user
+        + ", in queue: " + queueName);
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+  }
 
-    applications.put(applicationAttemptId, SchedulerApp);
-
-    LOG.info("Application Submission: " + applicationAttemptId + 
-        ", user: " + user +
-        " queue: " + queue +
-        ", currently active: " + applications.size());
-
+  private synchronized void addApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    CSQueue queue = (CSQueue) application.getQueue();
+
+    FiCaSchedulerApp SchedulerApp =
+        new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
+          queue, queue.getActiveUsersManager(), rmContext);
+    appAttempts.put(applicationAttemptId, SchedulerApp);
+    queue.submitApplicationAttempt(SchedulerApp, application.getUser());
+    LOG.info("Added Application Attempt " + applicationAttemptId
+        + " to scheduler from user " + application.getUser() + " in queue "
+        + queue.getQueueName());
     rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.APP_ACCEPTED));
+      new RMAppAttemptEvent(applicationAttemptId,
+          RMAppAttemptEventType.ATTEMPT_ADDED));
+  }
+
+  private synchronized void doneApplication(ApplicationId applicationId,
+      RMAppState finalState) {
+    SchedulerApplication application = applications.get(applicationId);
+    if (application == null){
+      // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
+      return;
+    }
+    CSQueue queue = (CSQueue) application.getQueue();
+    if (!(queue instanceof LeafQueue)) {
+      LOG.error("Cannot finish application " + "from non-leaf queue: "
+          + queue.getQueueName());
+    } else {
+      queue.finishApplication(applicationId, application.getUser());
+    }
+    applications.remove(applicationId);
   }
 
   private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState) {
-    LOG.info("Application " + applicationAttemptId + " is done." +
+    LOG.info("Application Attempt " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
     FiCaSchedulerApp application = getApplication(applicationAttemptId);
@@ -509,11 +542,11 @@ public class CapacityScheduler
       LOG.error("Cannot finish application " + "from non-leaf queue: "
           + queueName);
     } else {
-      queue.finishApplication(application, queue.getQueueName());
+      queue.finishApplicationAttempt(application, queue.getQueueName());
     }
     
     // Remove from our data-structure
-    applications.remove(applicationAttemptId);
+    appAttempts.remove(applicationAttemptId);
   }
 
   private static final Allocation EMPTY_ALLOCATION = 
@@ -740,12 +773,25 @@ public class CapacityScheduler
       nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
+    case APP_ADDED:
+    {
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      addApplication(appAddedEvent.getApplicationId(),
+        appAddedEvent.getQueue(), appAddedEvent.getUser());
+    }
+    break;
+    case APP_REMOVED:
+    {
+      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+      doneApplication(appRemovedEvent.getApplicationID(),
+        appRemovedEvent.getFinalState());
+    }
+    break;
     case APP_ATTEMPT_ADDED:
     {
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getQueue(), appAttemptAddedEvent.getUser());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -854,7 +900,7 @@ public class CapacityScheduler
 
   @Lock(Lock.NoLock.class)
   FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
-    return applications.get(applicationAttemptId);
+    return appAttempts.get(applicationAttemptId);
   }
 
   @Override
@@ -912,7 +958,7 @@ public class CapacityScheduler
       LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
           " container: " + cont.toString());
     }
-    FiCaSchedulerApp app = applications.get(aid);
+    FiCaSchedulerApp app = appAttempts.get(aid);
     if (app != null) {
       app.addPreemptContainer(cont.getContainerId());
     }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Jan  2 20:19:45 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -59,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -99,7 +99,7 @@ public class LeafQueue implements CSQueu
   private volatile int numContainers;
 
   Set<FiCaSchedulerApp> activeApplications;
-  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap = 
+  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = 
       new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
   Set<FiCaSchedulerApp> pendingApplications;
@@ -635,7 +635,22 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void submitApplication(FiCaSchedulerApp application, String userName,
+  public void submitApplicationAttempt(FiCaSchedulerApp application,
+      String userName) {
+    // Careful! Locking order is important!
+    synchronized (this) {
+      User user = getUser(userName);
+      // Add the attempt to our data-structures
+      addApplicationAttempt(application, user);
+    }
+
+    int attemptId = application.getApplicationAttemptId().getAttemptId();
+    metrics.submitApp(userName, attemptId);
+    getParent().submitApplicationAttempt(application, userName);
+  }
+
+  @Override
+  public void submitApplication(ApplicationId applicationId, String userName,
       String queue)  throws AccessControlException {
     // Careful! Locking order is important!
 
@@ -653,8 +668,7 @@ public class LeafQueue implements CSQueu
       // Check if the queue is accepting jobs
       if (getState() != QueueState.RUNNING) {
         String msg = "Queue " + getQueuePath() +
-        " is STOPPED. Cannot accept submission of application: " +
-        application.getApplicationId();
+        " is STOPPED. Cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
@@ -663,8 +677,7 @@ public class LeafQueue implements CSQueu
       if (getNumApplications() >= getMaxApplications()) {
         String msg = "Queue " + getQueuePath() + 
         " already has " + getNumApplications() + " applications," +
-        " cannot accept submission of application: " + 
-        application.getApplicationId();
+        " cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
@@ -675,26 +688,18 @@ public class LeafQueue implements CSQueu
         String msg = "Queue " + getQueuePath() + 
         " already has " + user.getTotalApplications() + 
         " applications from user " + userName + 
-        " cannot accept submission of application: " + 
-        application.getApplicationId();
+        " cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
-
-      // Add the application to our data-structures
-      addApplication(application, user);
     }
 
-    int attemptId = application.getApplicationAttemptId().getAttemptId();
-    metrics.submitApp(userName, attemptId);
-
     // Inform the parent queue
     try {
-      getParent().submitApplication(application, userName, queue);
+      getParent().submitApplication(applicationId, userName, queue);
     } catch (AccessControlException ace) {
       LOG.info("Failed to submit application to parent-queue: " + 
           getParent().getQueuePath(), ace);
-      removeApplication(application, user);
       throw ace;
     }
   }
@@ -722,11 +727,11 @@ public class LeafQueue implements CSQueu
     }
   }
   
-  private synchronized void addApplication(FiCaSchedulerApp application, User user) {
+  private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
     // Accept 
     user.submitApplication();
     pendingApplications.add(application);
-    applicationsMap.put(application.getApplicationAttemptId(), application);
+    applicationAttemptMap.put(application.getApplicationAttemptId(), application);
 
     // Activate applications
     activateApplications();
@@ -742,22 +747,28 @@ public class LeafQueue implements CSQueu
   }
 
   @Override
-  public void finishApplication(FiCaSchedulerApp application, String queue) {
+  public void finishApplication(ApplicationId application, String user) {
+    // Inform the activeUsersManager
+    activeUsersManager.deactivateApplication(user, application);
+    // Inform the parent queue
+    getParent().finishApplication(application, user);
+  }
+
+  @Override
+  public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
     // Careful! Locking order is important!
     synchronized (this) {
-      removeApplication(application, getUser(application.getUser()));
+      removeApplicationAttempt(application, getUser(application.getUser()));
     }
-
-    // Inform the parent queue
-    getParent().finishApplication(application, queue);
+    getParent().finishApplicationAttempt(application, queue);
   }
 
-  public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
+  public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
     boolean wasActive = activeApplications.remove(application);
     if (!wasActive) {
       pendingApplications.remove(application);
     }
-    applicationsMap.remove(application.getApplicationAttemptId());
+    applicationAttemptMap.remove(application.getApplicationAttemptId());
 
     user.finishApplication(wasActive);
     if (user.getTotalApplications() == 0) {
@@ -766,13 +777,7 @@ public class LeafQueue implements CSQueu
 
     // Check if we can activate more applications
     activateApplications();
-    
-    // Inform the activeUsersManager
-    synchronized (application) {
-      activeUsersManager.deactivateApplication(
-          application.getUser(), application.getApplicationId());
-    }
-    
+
     LOG.info("Application removed -" +
         " appId: " + application.getApplicationId() + 
         " user: " + application.getUser() + 
@@ -783,10 +788,10 @@ public class LeafQueue implements CSQueu
         " #queue-active-applications: " + getNumActiveApplications()
         );
   }
-  
+
   private synchronized FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
-    return applicationsMap.get(applicationAttemptId);
+    return applicationAttemptMap.get(applicationAttemptId);
   }
 
   private static final CSAssignment NULL_ASSIGNMENT =