You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/02 21:19:46 UTC

svn commit: r1554896 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn...

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Thu Jan  2 20:19:45 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -51,7 +52,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -442,7 +442,7 @@ public class ParentQueue implements CSQu
   }
 
   @Override
-  public void submitApplication(FiCaSchedulerApp application, String user,
+  public void submitApplication(ApplicationId applicationId, String user,
       String queue) throws AccessControlException {
     
     synchronized (this) {
@@ -455,57 +455,70 @@ public class ParentQueue implements CSQu
       if (state != QueueState.RUNNING) {
         throw new AccessControlException("Queue " + getQueuePath() +
             " is STOPPED. Cannot accept submission of application: " +
-            application.getApplicationId());
+            applicationId);
       }
 
-      addApplication(application, user);
+      addApplication(applicationId, user);
     }
     
     // Inform the parent queue
     if (parent != null) {
       try {
-        parent.submitApplication(application, user, queue);
+        parent.submitApplication(applicationId, user, queue);
       } catch (AccessControlException ace) {
         LOG.info("Failed to submit application to parent-queue: " + 
             parent.getQueuePath(), ace);
-        removeApplication(application, user);
+        removeApplication(applicationId, user);
         throw ace;
       }
     }
   }
 
-  private synchronized void addApplication(FiCaSchedulerApp application, 
+
+  @Override
+  public void submitApplicationAttempt(FiCaSchedulerApp application,
+      String userName) {
+    // submit attempt logic.
+  }
+
+  @Override
+  public void finishApplicationAttempt(FiCaSchedulerApp application,
+      String queue) {
+    // finish attempt logic.
+  }
+
+  private synchronized void addApplication(ApplicationId applicationId,
       String user) {
-  
+
     ++numApplications;
 
     LOG.info("Application added -" +
-        " appId: " + application.getApplicationId() + 
+        " appId: " + applicationId + 
         " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());
   }
   
   @Override
-  public void finishApplication(FiCaSchedulerApp application, String queue) {
+  public void finishApplication(ApplicationId application, String user) {
     
     synchronized (this) {
-      removeApplication(application, application.getUser());
+      removeApplication(application, user);
     }
     
     // Inform the parent queue
     if (parent != null) {
-      parent.finishApplication(application, queue);
+      parent.finishApplication(application, user);
     }
   }
 
-  public synchronized void removeApplication(FiCaSchedulerApp application, 
+  public synchronized void removeApplication(ApplicationId applicationId, 
       String user) {
     
     --numApplications;
 
     LOG.info("Application removed -" +
-        " appId: " + application.getApplicationId() + 
+        " appId: " + applicationId + 
         " user: " + user + 
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Thu Jan  2 20:19:45 2014
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
@@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resou
  */
 @Private
 @Unstable
-public class FiCaSchedulerApp extends SchedulerApplication {
+public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Thu Jan  2 20:19:45 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -206,7 +206,7 @@ public class FiCaSchedulerNode extends S
   }
 
   public synchronized void reserveResource(
-      SchedulerApplication application, Priority priority, 
+      SchedulerApplicationAttempt application, Priority priority, 
       RMContainer reservedContainer) {
     // Check if it's already reserved
     if (this.reservedContainer != null) {
@@ -241,7 +241,7 @@ public class FiCaSchedulerNode extends S
   }
 
   public synchronized void unreserveResource(
-      SchedulerApplication application) {
+      SchedulerApplicationAttempt application) {
     
     // adding NP checks as this can now be called for preemption
     if (reservedContainer != null

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1554896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Thu Jan  2 20:19:45 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class AppAddedSchedulerEvent extends SchedulerEvent {
+
+  private final ApplicationId applicationId;
+  private final String queue;
+  private final String user;
+
+  public AppAddedSchedulerEvent(
+      ApplicationId applicationId, String queue, String user) {
+    super(SchedulerEventType.APP_ADDED);
+    this.applicationId = applicationId;
+    this.queue = queue;
+    this.user = user;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Thu Jan  2 20:19:45 2014
@@ -23,27 +23,14 @@ import org.apache.hadoop.yarn.api.record
 public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
 
   private final ApplicationAttemptId applicationAttemptId;
-  private final String queue;
-  private final String user;
 
   public AppAttemptAddedSchedulerEvent(
-      ApplicationAttemptId applicationAttemptId, String queue, String user) {
+      ApplicationAttemptId applicationAttemptId) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
-    this.queue = queue;
-    this.user = user;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
     return applicationAttemptId;
   }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public String getUser() {
-    return user;
-  }
-
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java?rev=1554896&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppRemovedSchedulerEvent.java Thu Jan  2 20:19:45 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+
+public class AppRemovedSchedulerEvent extends SchedulerEvent {
+
+  private final ApplicationId applicationId;
+  private final RMAppState finalState;
+
+  public AppRemovedSchedulerEvent(ApplicationId applicationId,
+      RMAppState finalState) {
+    super(SchedulerEventType.APP_REMOVED);
+    this.applicationId = applicationId;
+    this.finalState = finalState;
+  }
+
+  public ApplicationId getApplicationID() {
+    return this.applicationId;
+  }
+
+  public RMAppState getFinalState() {
+    return this.finalState;
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Thu Jan  2 20:19:45 2014
@@ -24,7 +24,11 @@ public enum SchedulerEventType {
   NODE_ADDED,
   NODE_REMOVED,
   NODE_UPDATE,
-  
+
+  // Source: RMApp
+  APP_ADDED,
+  APP_REMOVED,
+
   // Source: RMAppAttempt
   APP_ATTEMPT_ADDED,
   APP_ATTEMPT_REMOVED,

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Thu Jan  2 20:19:45 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 
 @Private
 @Unstable

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu Jan  2 20:19:45 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.resou
  */
 @Private
 @Unstable
-public class FSSchedulerApp extends SchedulerApplication {
+public class FSSchedulerApp extends SchedulerApplicationAttempt {
 
   private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Jan  2 20:19:45 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,10 +59,13 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -75,8 +79,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -151,10 +157,15 @@ public class FairScheduler implements Re
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-  // This stores per-application scheduling information, indexed by
+  // This stores per-application scheduling information,
+  @VisibleForTesting
+  protected Map<ApplicationId, SchedulerApplication> applications =
+      new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+  // This stores per-application-attempt scheduling information, indexed by
   // attempt ID's for fast lookup.
   @VisibleForTesting
-  protected Map<ApplicationAttemptId, FSSchedulerApp> applications = 
+  protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts = 
       new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
 
   // Nodes in the cluster, indexed by NodeId
@@ -253,7 +264,7 @@ public class FairScheduler implements Re
 
   private RMContainer getRMContainer(ContainerId containerId) {
     FSSchedulerApp application = 
-        applications.get(containerId.getApplicationAttemptId());
+        appAttempts.get(containerId.getApplicationAttemptId());
     return (application == null) ? null : application.getRMContainer(containerId);
   }
 
@@ -591,44 +602,63 @@ public class FairScheduler implements Re
    * user. This will accept a new app even if the user or queue is above
    * configured limits, but the app will not be marked as runnable.
    */
-  protected synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId, String queueName, String user) {
+  protected synchronized void addApplication(ApplicationId applicationId,
+      String queueName, String user) {
     if (queueName == null || queueName.isEmpty()) {
-      String message = "Reject application " + applicationAttemptId +
+      String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
       LOG.info(message);
-      rmContext.getDispatcher().getEventHandler().handle(
-              new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+      rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, message));
       return;
     }
 
-    RMApp rmApp = rmContext.getRMApps().get(
-        applicationAttemptId.getApplicationId());
+    RMApp rmApp = rmContext.getRMApps().get(applicationId);
     FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
     if (queue == null) {
       rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId,
+          new RMAppRejectedEvent(applicationId,
               "Application rejected by queue placement policy"));
       return;
     }
 
-    FSSchedulerApp schedulerApp =
-        new FSSchedulerApp(applicationAttemptId, user,
-            queue, new ActiveUsersManager(getRootQueueMetrics()),
-            rmContext);
-
     // Enforce ACLs
     UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
 
     if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
         && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
       String msg = "User " + userUgi.getUserName() +
-    	        " cannot submit applications to queue " + queue.getName();
+              " cannot submit applications to queue " + queue.getName();
       LOG.info(msg);
-      rmContext.getDispatcher().getEventHandler().handle(
-    	        new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
+      rmContext.getDispatcher().getEventHandler()
+          .handle(new RMAppRejectedEvent(applicationId, msg));
       return;
     }
+  
+    SchedulerApplication application =
+        new SchedulerApplication(queue, user);
+    applications.put(applicationId, application);
+
+    LOG.info("Accepted application " + applicationId + " from user: " + user
+        + ", in queue: " + queueName);
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+  }
+
+  /**
+   * Add a new application attempt to the scheduler.
+   */
+  protected synchronized void addApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId) {
+    SchedulerApplication application =
+        applications.get(applicationAttemptId.getApplicationId());
+    String user = application.getUser();
+    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+
+    FSSchedulerApp schedulerApp =
+        new FSSchedulerApp(applicationAttemptId, user,
+            queue, new ActiveUsersManager(getRootQueueMetrics()),
+            rmContext);
 
     boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
     queue.addApp(schedulerApp, runnable);
@@ -639,16 +669,14 @@ public class FairScheduler implements Re
     }
     
     queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
+    appAttempts.put(applicationAttemptId, schedulerApp);
 
-    applications.put(applicationAttemptId, schedulerApp);
-
-    LOG.info("Application Submission: " + applicationAttemptId +
-        ", user: "+ user +
-        ", currently active: " + applications.size());
-
+    LOG.info("Added Application Attempt " + applicationAttemptId
+        + " to scheduler from user: " + user + ", currently active: "
+        + appAttempts.size());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.APP_ACCEPTED));
+            RMAppAttemptEventType.ATTEMPT_ADDED));
   }
   
   @VisibleForTesting
@@ -674,13 +702,18 @@ public class FairScheduler implements Re
     return queue;
   }
 
+  private synchronized void removeApplication(ApplicationId applicationId,
+      RMAppState finalState) {
+    applications.remove(applicationId);
+  }
+
   private synchronized void removeApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState) {
     LOG.info("Application " + applicationAttemptId + " is done." +
         " finalState=" + rmAppAttemptFinalState);
 
-    FSSchedulerApp application = applications.get(applicationAttemptId);
+    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
 
     if (application == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@@ -720,7 +753,7 @@ public class FairScheduler implements Re
     }
     
     // Remove from our data-structure
-    applications.remove(applicationAttemptId);
+    appAttempts.remove(applicationAttemptId);
   }
 
   /**
@@ -737,7 +770,7 @@ public class FairScheduler implements Re
 
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
-    FSSchedulerApp application = applications.get(applicationAttemptId);
+    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
     if (application == null) {
       LOG.info("Container " + container + " of" +
           " unknown application " + applicationAttemptId +
@@ -811,7 +844,7 @@ public class FairScheduler implements Re
       List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
     // Make sure this application exists
-    FSSchedulerApp application = applications.get(appAttemptId);
+    FSSchedulerApp application = appAttempts.get(appAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + appAttemptId);
@@ -882,7 +915,7 @@ public class FairScheduler implements Re
   private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
     // Get the application for the finished container
     ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
-    FSSchedulerApp application = applications.get(applicationAttemptId);
+    FSSchedulerApp application = appAttempts.get(applicationAttemptId);
     if (application == null) {
       LOG.info("Unknown application: " + applicationAttemptId +
           " launched container " + containerId +
@@ -1025,23 +1058,23 @@ public class FairScheduler implements Re
   }
   
   public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    return applications.get(appAttemptId);
+    return appAttempts.get(appAttemptId);
   }
   
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId appAttemptId) {
-    if (!applications.containsKey(appAttemptId)) {
+    if (!appAttempts.containsKey(appAttemptId)) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return new SchedulerAppReport(applications.get(appAttemptId));
+    return new SchedulerAppReport(appAttempts.get(appAttemptId));
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp app = applications.get(appAttemptId);
+    FSSchedulerApp app = appAttempts.get(appAttemptId);
     if (app == null) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
@@ -1090,15 +1123,29 @@ public class FairScheduler implements Re
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
       nodeUpdate(nodeUpdatedEvent.getRMNode());
       break;
+    case APP_ADDED:
+      if (!(event instanceof AppAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      addApplication(appAddedEvent.getApplicationId(),
+        appAddedEvent.getQueue(), appAddedEvent.getUser());
+      break;
+    case APP_REMOVED:
+      if (!(event instanceof AppRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+      removeApplication(appRemovedEvent.getApplicationID(),
+        appRemovedEvent.getFinalState());
+      break;
     case APP_ATTEMPT_ADDED:
       if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      String queue = appAttemptAddedEvent.getQueue();
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        queue, appAttemptAddedEvent.getUser());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Jan  2 20:19:45 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,6 +59,9 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -74,12 +78,15 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -116,11 +123,15 @@ public class FifoScheduler implements Re
   private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
+  @VisibleForTesting
+  protected Map<ApplicationId, SchedulerApplication> applications =
+      new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
+
   // Use ConcurrentSkipListMap because applications need to be ordered
   @VisibleForTesting
-  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
       = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
-  
+
   private ActiveUsersManager activeUsersManager;
 
   private static final String DEFAULT_QUEUE_NAME = "default";
@@ -327,7 +338,7 @@ public class FifoScheduler implements Re
   @VisibleForTesting
   FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
-    return applications.get(applicationAttemptId);
+    return appAttempts.get(applicationAttemptId);
   }
 
   @Override
@@ -347,20 +358,44 @@ public class FifoScheduler implements Re
   private FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
-  
-  private synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId,
-      String user) {
+
+  private synchronized void addApplication(ApplicationId applicationId,
+      String queue, String user) {
+    SchedulerApplication application =
+        new SchedulerApplication(null, user);
+    applications.put(applicationId, application);
+    LOG.info("Accepted application " + applicationId + " from user: " + user);
+    rmContext.getDispatcher().getEventHandler()
+        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+  }
+
+  private synchronized void addApplicationAttempt(
+      ApplicationAttemptId appAttemptId) {
+    SchedulerApplication application =
+        applications.get(appAttemptId.getApplicationId());
+    String user = application.getUser();
     // TODO: Fix store
-    FiCaSchedulerApp schedulerApp = 
-        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
-            this.rmContext);
-    applications.put(appAttemptId, schedulerApp);
+    FiCaSchedulerApp schedulerApp =
+        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
+          activeUsersManager, this.rmContext);
+    appAttempts.put(appAttemptId, schedulerApp);
     metrics.submitApp(user, appAttemptId.getAttemptId());
-    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
-        " from " + user + ", currently active: " + applications.size());
+    LOG.info("Added Application Attempt " + appAttemptId
+        + " to scheduler from user " + application.getUser()
+        + ", currently active: " + appAttempts.size());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId,
-            RMAppAttemptEventType.APP_ACCEPTED));
+            RMAppAttemptEventType.ATTEMPT_ADDED));
+  }
+
+  private synchronized void doneApplication(ApplicationId applicationId,
+      RMAppState finalState) {
+    SchedulerApplication application = applications.get(applicationId);
+
+    // Inform the activeUsersManager
+    activeUsersManager.deactivateApplication(application.getUser(),
+      applicationId);
+    applications.remove(applicationId);
   }
 
   private synchronized void doneApplicationAttempt(
@@ -382,17 +417,11 @@ public class FifoScheduler implements Re
           RMContainerEventType.KILL);
     }
 
-    // Inform the activeUsersManager
-    synchronized (application) {
-      activeUsersManager.deactivateApplication(
-          application.getUser(), application.getApplicationId());
-    }
-
     // Clean up pending requests, metrics etc.
     application.stop(rmAppAttemptFinalState);
 
     // Remove the application
-    applications.remove(applicationAttemptId);
+    appAttempts.remove(applicationAttemptId);
   }
   
   /**
@@ -403,10 +432,10 @@ public class FifoScheduler implements Re
   private void assignContainers(FiCaSchedulerNode node) {
     LOG.debug("assignContainers:" +
         " node=" + node.getRMNode().getNodeAddress() + 
-        " #applications=" + applications.size());
+        " #applications=" + appAttempts.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
+    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
         .entrySet()) {
       FiCaSchedulerApp application = e.getValue();
       LOG.debug("pre-assignContainers");
@@ -445,7 +474,7 @@ public class FifoScheduler implements Re
 
     // Update the applications' headroom to correctly take into
     // account the containers assigned in this update.
-    for (FiCaSchedulerApp application : applications.values()) {
+    for (FiCaSchedulerApp application : appAttempts.values()) {
       application.setHeadroom(Resources.subtract(clusterResource, usedResource));
     }
   }
@@ -697,12 +726,25 @@ public class FifoScheduler implements Re
       nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
+    case APP_ADDED:
+    {
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      addApplication(appAddedEvent.getApplicationId(),
+        appAddedEvent.getQueue(), appAddedEvent.getUser());
+    }
+    break;
+    case APP_REMOVED:
+    {
+      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+      doneApplication(appRemovedEvent.getApplicationID(),
+        appRemovedEvent.getFinalState());
+    }
+    break;
     case APP_ATTEMPT_ADDED:
     {
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
-      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getUser());
+      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -867,8 +909,8 @@ public class FifoScheduler implements Re
   public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
     if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
       List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
-          applications.size());
-      for (FiCaSchedulerApp app : applications.values()) {
+          appAttempts.size());
+      for (FiCaSchedulerApp app : appAttempts.values()) {
         apps.add(app.getApplicationAttemptId());
       }
       return apps;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Thu Jan  2 20:19:45 2014
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -164,11 +165,14 @@ public class Application {
     final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
     
     resourceManager.getClientRMService().submitApplication(request);
-    
+
     // Notify scheduler
-    AppAttemptAddedSchedulerEvent appAddedEvent1 = new AppAttemptAddedSchedulerEvent(
-            this.applicationAttemptId, this.queue, this.user);
-    scheduler.handle(appAddedEvent1);
+    AppAddedSchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
+    scheduler.handle(addAppEvent);
+    AppAttemptAddedSchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
+    scheduler.handle(addAttemptEvent);
   }
   
   public synchronized void addResourceRequestSpec(

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Thu Jan  2 20:19:45 2014
@@ -649,7 +649,7 @@ public class TestClientRMService {
             .currentTimeMillis(), "YARN"));
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
     RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
-        rmContext, yarnScheduler, null, asContext, config, null);
+        rmContext, yarnScheduler, null, asContext, config);
     when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
     return app;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Thu Jan  2 20:19:45 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -297,9 +298,12 @@ public class TestFifoScheduler {
     ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
     ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
         appId1, 1);
-    SchedulerEvent event1 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
-    fs.handle(event1);
+    SchedulerEvent appEvent =
+        new AppAddedSchedulerEvent(appId1, "queue", "user");
+    fs.handle(appEvent);
+    SchedulerEvent attemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1);
+    fs.handle(attemptEvent);
 
     List<ContainerId> emptyId = new ArrayList<ContainerId>();
     List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
@@ -388,16 +392,22 @@ public class TestFifoScheduler {
     ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
     ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
         appId1, 1);
-    SchedulerEvent event1 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
-    fs.handle(event1);
+    SchedulerEvent appEvent =
+        new AppAddedSchedulerEvent(appId1, "queue", "user");
+    fs.handle(appEvent);
+    SchedulerEvent attemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1);
+    fs.handle(attemptEvent);
 
     ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
     ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
         appId2, 1);
-    SchedulerEvent event2 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId2, "queue", "user");
-    fs.handle(event2);
+    SchedulerEvent appEvent2 =
+        new AppAddedSchedulerEvent(appId2, "queue", "user");
+    fs.handle(appEvent2);
+    SchedulerEvent attemptEvent2 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId2);
+    fs.handle(attemptEvent2);
 
     List<ContainerId> emptyId = new ArrayList<ContainerId>();
     List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Jan  2 20:19:45 2014
@@ -248,7 +248,7 @@ public class TestRMRestart {
     // verify correct number of attempts and other data
     RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
     Assert.assertNotNull(loadedApp1);
-    //Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
+    Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
     Assert.assertEquals(app1.getApplicationSubmissionContext()
         .getApplicationId(), loadedApp1.getApplicationSubmissionContext()
         .getApplicationId());
@@ -261,7 +261,7 @@ public class TestRMRestart {
         .getApplicationId());
     
     // verify state machine kicked into expected states
-    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
     rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
     
     // verify attempts for apps
@@ -299,7 +299,11 @@ public class TestRMRestart {
     nm2.registerNode();
     
     rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
-    Assert.assertEquals(2, loadedApp1.getAppAttempts().size());    
+    // wait for the 2nd attempt to be started.
+    int timeoutSecs = 0;
+    while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
+      Thread.sleep(200);
+    }
 
     // verify no more reboot response sent
     hbResponse = nm1.nodeHeartbeat(true);
@@ -476,10 +480,10 @@ public class TestRMRestart {
     Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
     
     RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
-    // application should be in running state
-    rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    // application should be in ACCEPTED state
+    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
     
-    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+    Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
     // new attempt should not be started
     Assert.assertEquals(2, rmApp.getAppAttempts().size());
     // am1 attempt should be in FAILED state where as am2 attempt should be in
@@ -516,9 +520,9 @@ public class TestRMRestart {
     nm1.setResourceTrackerService(rm3.getResourceTrackerService());
     
     rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
-    // application should be in running state
-    rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
-    Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
+    // application should be in ACCEPTED state
+    rm3.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    Assert.assertEquals(rmApp.getState(), RMAppState.ACCEPTED);
     // new attempt should not be started
     Assert.assertEquals(3, rmApp.getAppAttempts().size());
     // am1 and am2 attempts should be in FAILED state where as am3 should be
@@ -562,6 +566,11 @@ public class TestRMRestart {
     
     rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
     rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+    // wait for the attempt to be created.
+    int timeoutSecs = 0;
+    while (rmApp.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
+      Thread.sleep(200);
+    }
     Assert.assertEquals(4, rmApp.getAppAttempts().size());
     Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
     rm4.waitForState(latestAppAttemptId, RMAppAttemptState.SCHEDULED);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Thu Jan  2 20:19:45 2014
@@ -567,7 +567,9 @@ public class TestRMAppTransitions {
         RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    assertAppAndAttemptKilled(application);
+    sendAppUpdateSavedEvent(application);
+    assertKilled(application);
+    assertAppFinalStateSaved(application);
   }
 
   @Test
@@ -582,7 +584,7 @@ public class TestRMAppTransitions {
           new RMAppFailedAttemptEvent(application.getApplicationId(), 
               RMAppEventType.ATTEMPT_FAILED, "");
       application.handle(event);
-      assertAppState(RMAppState.SUBMITTED, application);
+      assertAppState(RMAppState.ACCEPTED, application);
       event = 
           new RMAppEvent(application.getApplicationId(), 
               RMAppEventType.APP_ACCEPTED);
@@ -612,7 +614,9 @@ public class TestRMAppTransitions {
         RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    assertAppAndAttemptKilled(application);
+    sendAppUpdateSavedEvent(application);
+    assertKilled(application);
+    assertAppFinalStateSaved(application);
   }
 
   @Test
@@ -654,7 +658,7 @@ public class TestRMAppTransitions {
               RMAppEventType.ATTEMPT_FAILED, "");
       application.handle(event);
       rmDispatcher.await();
-      assertAppState(RMAppState.SUBMITTED, application);
+      assertAppState(RMAppState.ACCEPTED, application);
       appAttempt = application.getCurrentAppAttempt();
       Assert.assertEquals(++expectedAttemptId, 
           appAttempt.getAppAttemptId().getAttemptId());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Thu Jan  2 20:19:45 2014
@@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -258,7 +257,7 @@ public class TestRMAppAttemptTransitions
     application = mock(RMApp.class);
     applicationAttempt =
         new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler,
-          masterService, submissionContext, new Configuration(), user);
+          masterService, submissionContext, new Configuration());
     when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
     when(application.getApplicationId()).thenReturn(applicationId);
     
@@ -408,9 +407,6 @@ public class TestRMAppAttemptTransitions
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
-    
-    // Check events
-    verify(application).handle(any(RMAppEvent.class));
   }
 
   /**
@@ -446,7 +442,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0, applicationAttempt.getRanNodes().size());
     
     // Check events
-    verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
+    verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
     verifyAttemptFinalStateSaved();
   }
@@ -544,7 +540,7 @@ public class TestRMAppAttemptTransitions
     applicationAttempt.handle(
         new RMAppAttemptEvent(
             applicationAttempt.getAppAttemptId(), 
-            RMAppAttemptEventType.APP_ACCEPTED));
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     
     if(unmanagedAM){
       assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
@@ -703,16 +699,6 @@ public class TestRMAppAttemptTransitions
             RMAppAttemptEventType.RECOVER));
     testAppAttemptRecoveredState();
   }
-  
-  @Test
-  public void testSubmittedToFailed() {
-    submitApplicationAttempt();
-    String message = "Rejected";
-    applicationAttempt.handle(
-        new RMAppAttemptRejectedEvent(
-            applicationAttempt.getAppAttemptId(), message));
-    testAppAttemptSubmittedToFailedState(message);
-  }
 
   @Test
   public void testSubmittedToKilled() {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Thu Jan  2 20:19:45 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -58,8 +59,12 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -378,4 +383,24 @@ public class TestSchedulerUtils {
           ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x");
     Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
   }
+
+  public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler(
+      final Map<ApplicationId, SchedulerApplication> applications,
+      EventHandler<SchedulerEvent> handler, String queueName) throws Exception {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    AppAddedSchedulerEvent appAddedEvent =
+        new AppAddedSchedulerEvent(appId, queueName, "user");
+    handler.handle(appAddedEvent);
+    SchedulerApplication app = applications.get(appId);
+    // verify application is added.
+    Assert.assertNotNull(app);
+    Assert.assertEquals("user", app.getUser());
+
+    AppRemovedSchedulerEvent appRemoveEvent =
+        new AppRemovedSchedulerEvent(appId, RMAppState.FINISHED);
+    handler.handle(appRemoveEvent);
+    Assert.assertNull(applications.get(appId));
+    return app;
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Thu Jan  2 20:19:45 2014
@@ -304,7 +304,7 @@ public class TestApplicationLimits {
     int APPLICATION_ID = 0;
     // Submit first application
     FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_0, user_0, A);
+    queue.submitApplicationAttempt(app_0, user_0);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -312,7 +312,7 @@ public class TestApplicationLimits {
 
     // Submit second application
     FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_1, user_0, A);
+    queue.submitApplicationAttempt(app_1, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -320,14 +320,14 @@ public class TestApplicationLimits {
     
     // Submit third application, should remain pending
     FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_2, user_0, A);
+    queue.submitApplicationAttempt(app_2, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     
     // Finish one application, app_2 should be activated
-    queue.finishApplication(app_0, A);
+    queue.finishApplicationAttempt(app_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -335,7 +335,7 @@ public class TestApplicationLimits {
     
     // Submit another one for user_0
     FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_3, user_0, A);
+    queue.submitApplicationAttempt(app_3, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -346,7 +346,7 @@ public class TestApplicationLimits {
     
     // Submit first app for user_1
     FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
-    queue.submitApplication(app_4, user_1, A);
+    queue.submitApplicationAttempt(app_4, user_1);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -356,7 +356,7 @@ public class TestApplicationLimits {
 
     // Submit second app for user_1, should block due to queue-limit
     FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
-    queue.submitApplication(app_5, user_1, A);
+    queue.submitApplicationAttempt(app_5, user_1);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -365,7 +365,7 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumPendingApplications(user_1));
 
     // Now finish one app of user_1 so app_5 should be activated
-    queue.finishApplication(app_4, A);
+    queue.finishApplicationAttempt(app_4, A);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -385,7 +385,7 @@ public class TestApplicationLimits {
 
     // Submit first application
     FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_0, user_0, A);
+    queue.submitApplicationAttempt(app_0, user_0);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -394,7 +394,7 @@ public class TestApplicationLimits {
 
     // Submit second application
     FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_1, user_0, A);
+    queue.submitApplicationAttempt(app_1, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -403,7 +403,7 @@ public class TestApplicationLimits {
 
     // Submit third application, should remain pending
     FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_2, user_0, A);
+    queue.submitApplicationAttempt(app_2, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -412,7 +412,7 @@ public class TestApplicationLimits {
 
     // Submit fourth application, should remain pending
     FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplication(app_3, user_0, A);
+    queue.submitApplicationAttempt(app_3, user_0);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -420,7 +420,7 @@ public class TestApplicationLimits {
     assertTrue(queue.pendingApplications.contains(app_3));
 
     // Kill 3rd pending application
-    queue.finishApplication(app_2, A);
+    queue.finishApplicationAttempt(app_2, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -429,7 +429,7 @@ public class TestApplicationLimits {
     assertFalse(queue.activeApplications.contains(app_2));
 
     // Finish 1st application, app_3 should become active
-    queue.finishApplication(app_0, A);
+    queue.finishApplicationAttempt(app_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -439,7 +439,7 @@ public class TestApplicationLimits {
     assertFalse(queue.activeApplications.contains(app_0));
 
     // Finish 2nd application
-    queue.finishApplication(app_1, A);
+    queue.finishApplicationAttempt(app_1, A);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -447,7 +447,7 @@ public class TestApplicationLimits {
     assertFalse(queue.activeApplications.contains(app_1));
 
     // Finish 4th application
-    queue.finishApplication(app_3, A);
+    queue.finishApplicationAttempt(app_3, A);
     assertEquals(0, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumActiveApplications(user_0));
@@ -507,7 +507,7 @@ public class TestApplicationLimits {
     FiCaSchedulerApp app_0_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, 
             queue.getActiveUsersManager(), rmContext));
-    queue.submitApplication(app_0_0, user_0, A);
+    queue.submitApplicationAttempt(app_0_0, user_0);
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
     app_0_0_requests.add(
@@ -526,7 +526,7 @@ public class TestApplicationLimits {
     FiCaSchedulerApp app_0_1 = 
         spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, 
             queue.getActiveUsersManager(), rmContext));
-    queue.submitApplication(app_0_1, user_0, A);
+    queue.submitApplicationAttempt(app_0_1, user_0);
     
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
     app_0_1_requests.add(
@@ -545,7 +545,7 @@ public class TestApplicationLimits {
     FiCaSchedulerApp app_1_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, 
             queue.getActiveUsersManager(), rmContext));
-    queue.submitApplication(app_1_0, user_1, A);
+    queue.submitApplicationAttempt(app_1_0, user_1);
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
     app_1_0_requests.add(

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1554896&r1=1554895&r2=1554896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Thu Jan  2 20:19:45 2014
@@ -64,7 +64,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -555,9 +558,12 @@ public class TestCapacityScheduler {
     ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
     ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
         appId, 1);
-    SchedulerEvent event =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
-    cs.handle(event);
+    SchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, "default", "user");
+    cs.handle(addAppEvent);
+    SchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId);
+    cs.handle(addAttemptEvent);
 
     // Verify the blacklist can be updated independent of requesting containers
     cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
@@ -596,10 +602,10 @@ public class TestCapacityScheduler {
     public void testConcurrentAccessOnApplications() throws Exception {
       CapacityScheduler cs = new CapacityScheduler();
       verifyConcurrentAccessOnApplications(
-          cs.applications, FiCaSchedulerApp.class, Queue.class);
+          cs.appAttempts, FiCaSchedulerApp.class, Queue.class);
     }
 
-    public static <T extends SchedulerApplication, Q extends Queue>
+    public static <T extends SchedulerApplicationAttempt, Q extends Queue>
         void verifyConcurrentAccessOnApplications(
             final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
             final Class<Q> queueClazz)
@@ -682,4 +688,21 @@ public class TestCapacityScheduler {
       Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
     }
 
-}
+  @Test
+  public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
+
+    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
+      null, null, new RMContainerTokenSecretManager(conf),
+      new NMTokenSecretManagerInRM(conf),
+      new ClientToAMTokenSecretManagerInRM()));
+
+    SchedulerApplication app =
+        TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
+          cs.applications, cs, "a1");
+    Assert.assertEquals("a1", app.getQueue().getQueueName());
+  }
+ }