You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2014/06/03 02:56:49 UTC

svn commit: r1599400 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-s...

Author: sandy
Date: Tue Jun  3 00:56:48 2014
New Revision: 1599400

URL: http://svn.apache.org/r1599400
Log:
YARN-1913. With Fair Scheduler, cluster can logjam when all resources are consumed by AMs (Wei Yan via Sandy Ryza)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jun  3 00:56:48 2014
@@ -132,6 +132,9 @@ Release 2.5.0 - UNRELEASED
 
     YARN-1474. Make schedulers services. (Tsuyoshi Ozawa via kasha)
 
+    YARN-1913. With Fair Scheduler, cluster can logjam when all resources are
+    consumed by AMs (Wei Yan via Sandy Ryza)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Jun  3 00:56:48 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -76,6 +77,8 @@ public class SchedulerApplicationAttempt
   protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
   protected Resource currentConsumption = Resource.newInstance(0, 0);
+  private Resource amResource;
+  private boolean unmanagedAM = true;
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -106,6 +109,19 @@ public class SchedulerApplicationAttempt
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
             activeUsersManager);
     this.queue = queue;
+
+
+    if (rmContext != null && rmContext.getRMApps() != null &&
+        rmContext.getRMApps()
+            .containsKey(applicationAttemptId.getApplicationId())) {
+      ApplicationSubmissionContext appSubmissionContext =
+          rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+              .getApplicationSubmissionContext();
+      if (appSubmissionContext != null) {
+        amResource = appSubmissionContext.getResource();
+        unmanagedAM = appSubmissionContext.getUnmanagedAM();
+      }
+    }
   }
   
   /**
@@ -168,6 +184,14 @@ public class SchedulerApplicationAttempt
     return appSchedulingInfo.getQueueName();
   }
   
+  public Resource getAMResource() {
+    return amResource;
+  }
+
+  public boolean getUnmanagedAM() {
+    return unmanagedAM;
+  }
+
   public synchronized RMContainer getRMContainer(ContainerId id) {
     return liveContainers.get(id);
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Tue Jun  3 00:56:48 2014
@@ -53,6 +53,10 @@ public class AllocationConfiguration {
   private final int userMaxAppsDefault;
   private final int queueMaxAppsDefault;
 
+  // Maximum resource share for each leaf queue that can be used to run AMs
+  final Map<String, Float> queueMaxAMShares;
+  private final float queueMaxAMShareDefault;
+
   // ACL's for each queue. Only specifies non-default ACL's from configuration.
   private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
 
@@ -84,8 +88,9 @@ public class AllocationConfiguration {
   public AllocationConfiguration(Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources,
       Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-      Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
-      int queueMaxAppsDefault,
+      Map<String, ResourceWeights> queueWeights,
+      Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+      int queueMaxAppsDefault, float queueMaxAMShareDefault,
       Map<String, SchedulingPolicy> schedulingPolicies,
       SchedulingPolicy defaultSchedulingPolicy,
       Map<String, Long> minSharePreemptionTimeouts,
@@ -97,9 +102,11 @@ public class AllocationConfiguration {
     this.maxQueueResources = maxQueueResources;
     this.queueMaxApps = queueMaxApps;
     this.userMaxApps = userMaxApps;
+    this.queueMaxAMShares = queueMaxAMShares;
     this.queueWeights = queueWeights;
     this.userMaxAppsDefault = userMaxAppsDefault;
     this.queueMaxAppsDefault = queueMaxAppsDefault;
+    this.queueMaxAMShareDefault = queueMaxAMShareDefault;
     this.defaultSchedulingPolicy = defaultSchedulingPolicy;
     this.schedulingPolicies = schedulingPolicies;
     this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@@ -116,8 +123,10 @@ public class AllocationConfiguration {
     queueWeights = new HashMap<String, ResourceWeights>();
     queueMaxApps = new HashMap<String, Integer>();
     userMaxApps = new HashMap<String, Integer>();
+    queueMaxAMShares = new HashMap<String, Float>();
     userMaxAppsDefault = Integer.MAX_VALUE;
     queueMaxAppsDefault = Integer.MAX_VALUE;
+    queueMaxAMShareDefault = 1.0f;
     queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
     minSharePreemptionTimeouts = new HashMap<String, Long>();
     defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@@ -184,6 +193,11 @@ public class AllocationConfiguration {
     return (maxApps == null) ? queueMaxAppsDefault : maxApps;
   }
   
+  public float getQueueMaxAMShare(String queue) {
+    Float maxAMShare = queueMaxAMShares.get(queue);
+    return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
+  }
+
   /**
    * Get the minimum resource allocation for the given queue.
    * @return the cap set on this queue, or 0 if not set.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Tue Jun  3 00:56:48 2014
@@ -209,6 +209,7 @@ public class AllocationFileLoaderService
     Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
     Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
     Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+    Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
     Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -216,6 +217,7 @@ public class AllocationFileLoaderService
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
     int queueMaxAppsDefault = Integer.MAX_VALUE;
+    float queueMaxAMShareDefault = 1.0f;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@@ -282,6 +284,11 @@ public class AllocationFileLoaderService
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
           queueMaxAppsDefault = val;
+        } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.min(val, 1.0f);
+          queueMaxAMShareDefault = val;
         } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
             || "defaultQueueSchedulingMode".equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
@@ -306,8 +313,8 @@ public class AllocationFileLoaderService
         parent = null;
       }
       loadQueue(parent, element, minQueueResources, maxQueueResources,
-          queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-          minSharePreemptionTimeouts, queueAcls,
+          queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+          queuePolicies, minSharePreemptionTimeouts, queueAcls,
           configuredQueues);
     }
     
@@ -322,8 +329,8 @@ public class AllocationFileLoaderService
     }
     
     AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
-        queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
-        queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+        queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
+        queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
         queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
         newPlacementPolicy, configuredQueues);
     
@@ -338,7 +345,8 @@ public class AllocationFileLoaderService
    */
   private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+      Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
+      Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, 
@@ -370,6 +378,11 @@ public class AllocationFileLoaderService
         String text = ((Text)field.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
         queueMaxApps.put(queueName, val);
+      } else if ("maxAMShare".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.min(val, 1.0f);
+        queueMaxAMShares.put(queueName, val);
       } else if ("weight".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         double val = Double.parseDouble(text);
@@ -392,8 +405,9 @@ public class AllocationFileLoaderService
       } else if ("queue".endsWith(field.getTagName()) || 
           "pool".equals(field.getTagName())) {
         loadQueue(queueName, field, minQueueResources, maxQueueResources,
-            queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-            minSharePreemptionTimeouts, queueAcls, configuredQueues);
+            queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
+            queuePolicies, minSharePreemptionTimeouts, queueAcls,
+            configuredQueues);
         configuredQueues.get(FSQueueType.PARENT).add(queueName);
         isLeaf = false;
       }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Tue Jun  3 00:56:48 2014
@@ -267,6 +267,12 @@ public class AppSchedulable extends Sche
       node.allocateContainer(app.getApplicationId(),
           allocatedContainer);
 
+      // If this container is used to run AM, update the leaf queue's AM usage
+      if (app.getLiveContainers().size() == 1 &&
+          !app.getUnmanagedAM()) {
+        queue.addAMResourceUsage(container.getResource());
+      }
+
       return container.getResource();
     } else {
       // The desired container won't fit here, so reserve
@@ -297,6 +303,14 @@ public class AppSchedulable extends Sche
         
         app.addSchedulingOpportunity(priority);
 
+        // Check the AM resource usage for the leaf queue
+        if (app.getLiveContainers().size() == 0
+            && !app.getUnmanagedAM()) {
+          if (!queue.canRunAppAM(app.getAMResource())) {
+            return Resources.none();
+          }
+        }
+
         ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
             node.getRackName());
         ResourceRequest localRequest = app.getResourceRequest(priority,

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Tue Jun  3 00:56:48 2014
@@ -55,6 +55,9 @@ public class FSLeafQueue extends FSQueue
   private long lastTimeAtMinShare;
   private long lastTimeAtHalfFairShare;
   
+  // Track the AM resource usage for this queue
+  private Resource amResourceUsage;
+
   private final ActiveUsersManager activeUsersManager;
   
   public FSLeafQueue(String name, FairScheduler scheduler,
@@ -63,6 +66,7 @@ public class FSLeafQueue extends FSQueue
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
     this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
+    amResourceUsage = Resource.newInstance(0, 0);
   }
   
   public void addApp(FSSchedulerApp app, boolean runnable) {
@@ -86,6 +90,10 @@ public class FSLeafQueue extends FSQueue
    */
   public boolean removeApp(FSSchedulerApp app) {
     if (runnableAppScheds.remove(app.getAppSchedulable())) {
+      // Update AM resource usage
+      if (app.getAMResource() != null) {
+        Resources.subtractFrom(amResourceUsage, app.getAMResource());
+      }
       return true;
     } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
       return false;
@@ -284,4 +292,26 @@ public class FSLeafQueue extends FSQueue
   public ActiveUsersManager getActiveUsersManager() {
     return activeUsersManager;
   }
+
+  /**
+   * Check whether this queue can run this application master under the
+   * maxAMShare limit
+   *
+   * @param amResource
+   * @return true if this queue can run
+   */
+  public boolean canRunAppAM(Resource amResource) {
+    float maxAMShare =
+        scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
+    Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
+    Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
+    return !policy
+        .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource);
+  }
+
+  public void addAMResourceUsage(Resource amResource) {
+    if (amResource != null) {
+      Resources.addTo(amResourceUsage, amResource);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Tue Jun  3 00:56:48 2014
@@ -149,4 +149,15 @@ public abstract class SchedulingPolicy {
    */
   public abstract boolean checkIfUsageOverFairShare(
       Resource usage, Resource fairShare);
+
+  /**
+   * Check if a leaf queue's AM resource usage over its limit under this policy
+   *
+   * @param usage {@link Resource} the resource used by application masters
+   * @param maxAMResource {@link Resource} the maximum allowed resource for
+   *                                      application masters
+   * @return true if AM resource usage is over the limit
+   */
+  public abstract boolean checkIfAMResourceUsageOverLimit(
+      Resource usage, Resource maxAMResource);
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Tue Jun  3 00:56:48 2014
@@ -75,6 +75,11 @@ public class DominantResourceFairnessPol
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return !Resources.fitsIn(usage, maxAMResource);
+  }
+
+  @Override
   public void initialize(Resource clusterCapacity) {
     comparator.setClusterCapacity(clusterCapacity);
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Tue Jun  3 00:56:48 2014
@@ -125,6 +125,11 @@ public class FairSharePolicy extends Sch
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_ANY;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Tue Jun  3 00:56:48 2014
@@ -95,6 +95,11 @@ public class FifoPolicy extends Scheduli
   }
 
   @Override
+  public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+    return usage.getMemory() > maxAMResource.getMemory();
+  }
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_LEAF;
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java Tue Jun  3 00:56:48 2014
@@ -20,14 +20,21 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
 
@@ -169,4 +176,20 @@ public class FairSchedulerTestBase {
     ask.add(request);
     scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(), null, null);
   }
+
+  protected void createApplicationWithAMResource(ApplicationAttemptId attId,
+      String queue, String user, Resource amResource) {
+    RMContext rmContext = resourceManager.getRMContext();
+    RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf,
+        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
+        null, null, null, false, false, 0, amResource, null), null, null,
+        0, null, null);
+    rmContext.getRMApps().put(attId.getApplicationId(), rmApp);
+    AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
+        attId.getApplicationId(), queue, user);
+    scheduler.handle(appAddedEvent);
+    AppAttemptAddedSchedulerEvent attempAddedEvent =
+        new AppAttemptAddedSchedulerEvent(attId, false);
+    scheduler.handle(attempAddedEvent);
+  }
 }
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java Tue Jun  3 00:56:48 2014
@@ -174,9 +174,10 @@ public class TestAllocationFileLoaderSer
     out.println("<queue name=\"queueC\">");
     out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
     out.println("</queue>");
-    // Give queue D a limit of 3 running apps
+    // Give queue D a limit of 3 running apps and 0.4f maxAMShare
     out.println("<queue name=\"queueD\">");
     out.println("<maxRunningApps>3</maxRunningApps>");
+    out.println("<maxAMShare>0.4</maxAMShare>");
     out.println("</queue>");
     // Give queue E a preemption timeout of one minute
     out.println("<queue name=\"queueE\">");
@@ -194,6 +195,8 @@ public class TestAllocationFileLoaderSer
     out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
     // Set default limit of apps per user to 5
     out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
+    // Set default limit of AMResourceShare to 0.5f
+    out.println("<queueMaxAMShareDefault>0.5f</queueMaxAMShareDefault>");
     // Give user1 a limit of 10 jobs
     out.println("<user name=\"user1\">");
     out.println("<maxRunningApps>10</maxRunningApps>");
@@ -240,6 +243,13 @@ public class TestAllocationFileLoaderSer
     assertEquals(10, queueConf.getUserMaxApps("user1"));
     assertEquals(5, queueConf.getUserMaxApps("user2"));
 
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01);
+    assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01);
+    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01);
+
     // Root should get * ACL
     assertEquals("*", queueConf.getQueueAcl("root",
         QueueACL.ADMINISTER_QUEUE).getAclString());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Jun  3 00:56:48 2014
@@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -73,12 +72,12 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
@@ -510,26 +509,14 @@ public class TestFairScheduler extends F
     scheduler.init(conf);
     scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    RMContext rmContext = resourceManager.getRMContext();
-    Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
-    RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
-        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
-        null, null, null, false, false, 0, null, null), null, null, 0, null, null);
-    appsMap.put(appAttemptId.getApplicationId(), rmApp);
-    
-    AppAddedSchedulerEvent appAddedEvent =
-        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
-          "user1");
-    scheduler.handle(appAddedEvent);
-    AppAttemptAddedSchedulerEvent attempAddedEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    scheduler.handle(attempAddedEvent);
+    createApplicationWithAMResource(appAttemptId, "default", "user1", null);
     assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getRunnableAppSchedulables().size());
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
         .getRunnableAppSchedulables().size());
-    assertEquals("root.user1", rmApp.getQueue());
+    assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
+        .get(appAttemptId.getApplicationId()).getQueue());
   }
   
   @Test
@@ -538,21 +525,8 @@ public class TestFairScheduler extends F
     scheduler.init(conf);
     scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    RMContext rmContext = resourceManager.getRMContext();
-    Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
-    RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
-        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
-        null, null, null, false, false, 0, null, null), null, null, 0, null, null);
-    appsMap.put(appAttemptId.getApplicationId(), rmApp);
-
-    AppAddedSchedulerEvent appAddedEvent =
-        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
-          "user2");
-    scheduler.handle(appAddedEvent);
-    AppAttemptAddedSchedulerEvent attempAddedEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    scheduler.handle(attempAddedEvent);
+    createApplicationWithAMResource(appAttemptId, "default", "user2", null);
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getRunnableAppSchedulables().size());
     assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -2330,6 +2304,121 @@ public class TestFairScheduler extends F
   }
   
   @Test
+  public void testQueueMaxAMShare() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queue1\">");
+    out.println("<maxAMShare>0.2</maxAMShare>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
+            0, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+    scheduler.update();
+
+    assertEquals("Queue queue1's fair share should be 10240",
+        10240, scheduler.getQueueManager().getLeafQueue("queue1", true)
+            .getFairShare().getMemory());
+
+    Resource amResource1 = Resource.newInstance(1024, 1);
+    Resource amResource2 = Resource.newInstance(2048, 2);
+    int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
+    // Exceeds no limits
+    ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(attId1, "queue1", "user1", amResource1);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
+    FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application1's AM requests 1024 MB memory",
+        1024, app1.getAMResource().getMemory());
+    assertEquals("Application1's AM should be running",
+        1, app1.getLiveContainers().size());
+
+    // Exceeds no limits
+    ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
+    createApplicationWithAMResource(attId2, "queue1", "user1", amResource1);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2);
+    FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application2's AM requests 1024 MB memory",
+        1024, app2.getAMResource().getMemory());
+    assertEquals("Application2's AM should be running",
+        1, app2.getLiveContainers().size());
+
+    // Exceeds queue limit
+    ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
+    createApplicationWithAMResource(attId3, "queue1", "user1", amResource1);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3);
+    FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application3's AM requests 1024 MB memory",
+        1024, app3.getAMResource().getMemory());
+    assertEquals("Application3's AM should not be running",
+        0, app3.getLiveContainers().size());
+
+    // Still can run non-AM container
+    createSchedulingRequestExistingApplication(1024, 1, attId1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application1 should have two running containers",
+        2, app1.getLiveContainers().size());
+
+    // Remove app1, app3's AM should become running
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
+        new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false);
+    scheduler.update();
+    scheduler.handle(appRemovedEvent1);
+    scheduler.handle(updateEvent);
+    assertEquals("Application1's AM should be finished",
+        0, app1.getLiveContainers().size());
+    assertEquals("Application3's AM should be running",
+        1, app3.getLiveContainers().size());
+
+    // Exceeds queue limit
+    ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
+    createApplicationWithAMResource(attId4, "queue1", "user1", amResource2);
+    createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4);
+    FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application4's AM requests 2048 MB memory",
+        2048, app4.getAMResource().getMemory());
+    assertEquals("Application4's AM should not be running",
+        0, app4.getLiveContainers().size());
+
+    // Remove app2 and app3, app4's AM should become running
+    AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
+        new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
+    AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
+        new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false);
+    scheduler.handle(appRemovedEvent2);
+    scheduler.handle(appRemovedEvent3);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application2's AM should be finished",
+        0, app2.getLiveContainers().size());
+    assertEquals("Application3's AM should be finished",
+        0, app3.getLiveContainers().size());
+    assertEquals("Application4's AM should be running",
+        1, app4.getLiveContainers().size());
+  }
+
+  @Test
   public void testMaxRunningAppsHierarchicalQueues() throws Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     MockClock clock = new MockClock();

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1599400&r1=1599399&r2=1599400&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Tue Jun  3 00:56:48 2014
@@ -237,6 +237,11 @@ Allocation file format
 
    * maxRunningApps: limit the number of apps from the queue to run at once
 
+   * maxAMShare: limit the fraction of the queue's fair share that can be used
+     to run application masters. This property can only be used for leaf queues.
+     Default value is 1.0f, which means AMs in the leaf queue can take up to 100%
+     of both the memory and CPU fair share.
+
    * weight: to share the cluster non-proportionally with other queues. Weights
      default to 1, and a queue with weight 2 should receive approximately twice
      as many resources as a queue with the default weight.
@@ -279,6 +284,9 @@ Allocation file format
  * <<A queueMaxAppsDefault element>>, which sets the default running app limit
    for queues; overriden by maxRunningApps element in each queue.
 
+ * <<A queueMaxAMShareDefault element>>, which sets the default AM resource
+   limit for queue; overriden by maxAMShare element in each queue.
+
  * <<A defaultQueueSchedulingPolicy element>>, which sets the default scheduling
    policy for queues; overriden by the schedulingPolicy element in each queue
    if specified. Defaults to "fair".
@@ -328,6 +336,7 @@ Allocation file format
     <minResources>10000 mb,0vcores</minResources>
     <maxResources>90000 mb,0vcores</maxResources>
     <maxRunningApps>50</maxRunningApps>
+    <maxAMShare>0.1</maxAMShare>
     <weight>2.0</weight>
     <schedulingPolicy>fair</schedulingPolicy>
     <queue name="sample_sub_queue">
@@ -336,6 +345,8 @@ Allocation file format
     </queue>
   </queue>
 
+  <queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
+
   <!—- Queue ‘secondary_group_queue’ is a parent queue and may have
        user queues under it -—>
   <queue name=“secondary_group_queue” type=“parent”>