You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [23/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Aug 19 23:49:39 2014
@@ -18,13 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -34,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -42,8 +47,6 @@ import java.util.Set;
 
 import javax.xml.parsers.ParserConfigurationException;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -64,23 +68,24 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -88,58 +93,28 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.xml.sax.SAXException;
 
 import com.google.common.collect.Sets;
 
-public class TestFairScheduler {
-
-  static class MockClock implements Clock {
-    private long time = 0;
-    @Override
-    public long getTime() {
-      return time;
-    }
-
-    public void tick(int seconds) {
-      time = time + seconds * 1000;
-    }
-
-  }
-
-  final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "/tmp")).getAbsolutePath();
-
-  final static String ALLOC_FILE = new File(TEST_DIR,
-      "test-queues").getAbsolutePath();
+@SuppressWarnings("unchecked")
+public class TestFairScheduler extends FairSchedulerTestBase {
+  private final static String ALLOC_FILE =
+      new File(TEST_DIR, "test-queues").getAbsolutePath();
 
-  private FairScheduler scheduler;
-  private ResourceManager resourceManager;
-  private Configuration conf;
-  private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-  private int APP_ID = 1; // Incrementing counter for schedling apps
-  private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
-
-  // HELPER METHODS
   @Before
   public void setUp() throws IOException {
     scheduler = new FairScheduler();
     conf = createConfiguration();
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
-    conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
-      1024);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
-    // All tests assume only one assignment per node update
-    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
     resourceManager = new ResourceManager();
     resourceManager.init(conf);
 
@@ -150,6 +125,8 @@ public class TestFairScheduler {
 
     // to initialize the master key
     resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+
+    scheduler.setRMContext(resourceManager.getRMContext());
   }
 
   @After
@@ -163,12 +140,12 @@ public class TestFairScheduler {
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
-    ResourceScheduler scheduler = new FairScheduler();
+    FairScheduler scheduler = new FairScheduler();
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min memory allocation is" +
         " larger than the max memory allocation.");
     } catch (YarnRuntimeException e) {
@@ -182,7 +159,7 @@ public class TestFairScheduler {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
     try {
-      scheduler.reinitialize(conf, null);
+      scheduler.serviceInit(conf);
       fail("Exception is expected because the min vcores allocation is" +
         " larger than the max vcores allocation.");
     } catch (YarnRuntimeException e) {
@@ -193,107 +170,6 @@ public class TestFairScheduler {
     }
   }
 
-  private Configuration createConfiguration() {
-    Configuration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
-        ResourceScheduler.class);
-    return conf;
-  }
-
-  private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
-    ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
-    ApplicationAttemptId attId =
-        ApplicationAttemptId.newInstance(appIdImpl, attemptId);
-    return attId;
-  }
-  
-  private ResourceRequest createResourceRequest(int memory, String host,
-      int priority, int numContainers, boolean relaxLocality) {
-    return createResourceRequest(memory, 1, host, priority, numContainers,
-        relaxLocality);
-  }
-
-  private ResourceRequest createResourceRequest(int memory, int vcores, String host,
-      int priority, int numContainers, boolean relaxLocality) {
-    ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
-    request.setCapability(BuilderUtils.newResource(memory, vcores));
-    request.setResourceName(host);
-    request.setNumContainers(numContainers);
-    Priority prio = recordFactory.newRecordInstance(Priority.class);
-    prio.setPriority(priority);
-    request.setPriority(prio);
-    request.setRelaxLocality(relaxLocality);
-    return request;
-  }
-
-  /**
-   * Creates a single container priority-1 request and submits to
-   * scheduler.
-   */
-  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
-      String userId) {
-    return createSchedulingRequest(memory, queueId, userId, 1);
-  }
-  
-  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
-      String queueId, String userId) {
-    return createSchedulingRequest(memory, vcores, queueId, userId, 1);
-  }
-
-  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
-      String userId, int numContainers) {
-    return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
-  }
-  
-  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
-      String queueId, String userId, int numContainers) {
-    return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
-  }
-
-  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
-      String userId, int numContainers, int priority) {
-    return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
-        priority);
-  }
-  
-  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
-      String queueId, String userId, int numContainers, int priority) {
-    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(id.getApplicationId(), queueId, userId);
-    // This conditional is for testAclSubmitApplication where app is rejected
-    // and no app is added.
-    if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
-      scheduler.addApplicationAttempt(id, false);
-    }
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
-        priority, numContainers, true);
-    ask.add(request);
-    scheduler.allocate(id, ask,  new ArrayList<ContainerId>(), null, null);
-    return id;
-  }
-  
-  private void createSchedulingRequestExistingApplication(int memory, int priority,
-      ApplicationAttemptId attId) {
-    ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
-        priority, 1, true);
-    createSchedulingRequestExistingApplication(request, attId);
-  }
-  
-  private void createSchedulingRequestExistingApplication(int memory, int vcores,
-      int priority, ApplicationAttemptId attId) {
-	ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
-		priority, 1, true);
-	createSchedulingRequestExistingApplication(request, attId);
-  }
-  
-  private void createSchedulingRequestExistingApplication(ResourceRequest request,
-      ApplicationAttemptId attId) {
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ask.add(request);
-    scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(), null, null);
-  }
-
   // TESTS
 
   @Test(timeout=2000)
@@ -315,6 +191,8 @@ public class TestFairScheduler {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 
       128);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     Assert.assertEquals(true, scheduler.assignMultiple);
     Assert.assertEquals(3, scheduler.maxAssign);
@@ -342,6 +220,7 @@ public class TestFairScheduler {
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
     conf.setInt(
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
+    fs.init(conf);
     fs.reinitialize(conf, null);
     Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
     Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
@@ -359,8 +238,9 @@ public class TestFairScheduler {
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
     conf.setInt(
       FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
-    fs.reinitialize(conf, null);  
-    Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());  
+    fs.init(conf);
+    fs.reinitialize(conf, null);
+    Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
     Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
     Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
     Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
@@ -368,6 +248,8 @@ public class TestFairScheduler {
   
   @Test
   public void testAggregateCapacityTracking() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -376,23 +258,25 @@ public class TestFairScheduler {
             .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
-    assertEquals(1024, scheduler.getClusterCapacity().getMemory());
+    assertEquals(1024, scheduler.getClusterResource().getMemory());
 
     // Add another node
     RMNode node2 =
         MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
-    assertEquals(1536, scheduler.getClusterCapacity().getMemory());
+    assertEquals(1536, scheduler.getClusterResource().getMemory());
 
     // Remove the first node
     NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
     scheduler.handle(nodeEvent3);
-    assertEquals(512, scheduler.getClusterCapacity().getMemory());
+    assertEquals(512, scheduler.getClusterResource().getMemory());
   }
 
   @Test
   public void testSimpleFairShareCalculation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -405,6 +289,7 @@ public class TestFairScheduler {
     // Have two queues which want entire cluster capacity
     createSchedulingRequest(10 * 1024, "queue1", "user1");
     createSchedulingRequest(10 * 1024, "queue2", "user1");
+    createSchedulingRequest(10 * 1024, "root.default", "user1");
 
     scheduler.update();
 
@@ -420,6 +305,8 @@ public class TestFairScheduler {
   
   @Test
   public void testSimpleHierarchicalFairShareCalculation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -433,6 +320,7 @@ public class TestFairScheduler {
     // Have two queues which want entire cluster capacity
     createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
     createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
+    createSchedulingRequest(10 * 1024, "root.default", "user1");
 
     scheduler.update();
 
@@ -453,6 +341,8 @@ public class TestFairScheduler {
 
   @Test
   public void testHierarchicalQueuesSimilarParents() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     QueueManager queueManager = scheduler.getQueueManager();
@@ -477,6 +367,8 @@ public class TestFairScheduler {
 
   @Test
   public void testSchedulerRootQueueMetrics() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -516,6 +408,8 @@ public class TestFairScheduler {
 
   @Test (timeout = 5000)
   public void testSimpleContainerAllocation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -564,6 +458,8 @@ public class TestFairScheduler {
 
   @Test (timeout = 5000)
   public void testSimpleContainerReservation() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -618,48 +514,27 @@ public class TestFairScheduler {
   @Test
   public void testUserAsDefaultQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    RMContext rmContext = resourceManager.getRMContext();
-    Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
-    RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
-        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
-        null, null, null, false, false, 0, null, null), null, null, 0, null, null);
-    appsMap.put(appAttemptId.getApplicationId(), rmApp);
-    
-    AppAddedSchedulerEvent appAddedEvent =
-        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
-          "user1");
-    scheduler.handle(appAddedEvent);
-    AppAttemptAddedSchedulerEvent attempAddedEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    scheduler.handle(attempAddedEvent);
+    createApplicationWithAMResource(appAttemptId, "default", "user1", null);
     assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getRunnableAppSchedulables().size());
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
         .getRunnableAppSchedulables().size());
-    assertEquals("root.user1", rmApp.getQueue());
+    assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
+        .get(appAttemptId.getApplicationId()).getQueue());
   }
   
   @Test
   public void testNotUserAsDefaultQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    RMContext rmContext = resourceManager.getRMContext();
-    Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
-    RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
-        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
-        null, null, null, false, false, 0, null, null), null, null, 0, null, null);
-    appsMap.put(appAttemptId.getApplicationId(), rmApp);
-
-    AppAddedSchedulerEvent appAddedEvent =
-        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
-          "user2");
-    scheduler.handle(appAddedEvent);
-    AppAttemptAddedSchedulerEvent attempAddedEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    scheduler.handle(attempAddedEvent);
+    createApplicationWithAMResource(appAttemptId, "default", "user2", null);
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getRunnableAppSchedulables().size());
     assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -670,6 +545,8 @@ public class TestFairScheduler {
 
   @Test
   public void testEmptyQueueName() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // only default queue
@@ -690,8 +567,10 @@ public class TestFairScheduler {
   @Test
   public void testAssignToQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
     RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
     
@@ -708,6 +587,8 @@ public class TestFairScheduler {
   @Test
   public void testAssignToNonLeafQueueReturnsNull() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
@@ -725,6 +606,8 @@ public class TestFairScheduler {
   public void testQueuePlacementWithPolicy() throws Exception {
     conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
         SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId appId;
@@ -737,8 +620,11 @@ public class TestFairScheduler {
     rules.add(new QueuePlacementRule.Default().initialize(true, null));
     Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
         "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
+    Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
+    configuredQueues.put(FSQueueType.LEAF, queues);
+    configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
     scheduler.getAllocationConfiguration().placementPolicy =
-        new QueuePlacementPolicy(rules, queues, conf);
+        new QueuePlacementPolicy(rules, configuredQueues, conf);
     appId = createSchedulingRequest(1024, "somequeue", "user1");
     assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
     appId = createSchedulingRequest(1024, "default", "user1");
@@ -758,7 +644,7 @@ public class TestFairScheduler {
     rules.add(new QueuePlacementRule.Specified().initialize(true, null));
     rules.add(new QueuePlacementRule.Default().initialize(true, null));
     scheduler.getAllocationConfiguration().placementPolicy =
-        new QueuePlacementPolicy(rules, queues, conf);
+        new QueuePlacementPolicy(rules, configuredQueues, conf);
     appId = createSchedulingRequest(1024, "somequeue", "user1");
     assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
     appId = createSchedulingRequest(1024, "somequeue", "otheruser");
@@ -782,7 +668,9 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -809,23 +697,113 @@ public class TestFairScheduler {
       }
     }
   }
+  
+  @Test
+  public void testNestedUserQueue() throws IOException {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"user1group\" type=\"parent\">");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queuePlacementPolicy>");
+    out.println("<rule name=\"specified\" create=\"false\" />");
+    out.println("<rule name=\"nestedUserQueue\">");
+    out.println("     <rule name=\"primaryGroup\" create=\"false\" />");
+    out.println("</rule>");
+    out.println("<rule name=\"default\" />");
+    out.println("</queuePlacementPolicy>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+    RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
+
+    FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
+        "user1");
+
+    assertEquals("root.user1group.user1", user1Leaf.getName());
+  }
+
+  @Test
+  public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"parentq\" type=\"parent\">");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queuePlacementPolicy>");
+    out.println("<rule name=\"nestedUserQueue\">");
+    out.println("     <rule name=\"specified\" create=\"false\" />");
+    out.println("</rule>");
+    out.println("<rule name=\"default\" />");
+    out.println("</queuePlacementPolicy>");
+    out.println("</allocations>");
+    out.close();
+
+    RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
+    RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    int capacity = 16 * 1024;
+    // create node with 16 G
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity),
+        1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
 
+    // user1,user2 submit their apps to parentq and create user queues
+    createSchedulingRequest(10 * 1024, "root.parentq", "user1");
+    createSchedulingRequest(10 * 1024, "root.parentq", "user2");
+    // user3 submits app in default queue
+    createSchedulingRequest(10 * 1024, "root.default", "user3");
+
+    scheduler.update();
+
+    Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
+        .getLeafQueues();
+
+    for (FSLeafQueue leaf : leafQueues) {
+      if (leaf.getName().equals("root.parentq.user1")
+          || leaf.getName().equals("root.parentq.user2")) {
+        // assert that the fair share is 1/4th node1's capacity
+        assertEquals(capacity / 4, leaf.getFairShare().getMemory());
+        // assert weights are equal for both the user queues
+        assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
+      }
+    }
+  }
+  
   /**
    * Make allocation requests and ensure they are reflected in queue demand.
    */
   @Test
   public void testQueueDemandCalculation() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
-    scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
-    scheduler.addApplicationAttempt(id11, false);
+    scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false);
+    scheduler.addApplicationAttempt(id11, false, false);
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
-    scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id21, false);
+    scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false);
+    scheduler.addApplicationAttempt(id21, false, false);
     ApplicationAttemptId id22 = createAppAttemptId(2, 2);
-    scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
-    scheduler.addApplicationAttempt(id22, false);
+    scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false);
+    scheduler.addApplicationAttempt(id22, false, false);
 
     int minReqSize = 
         FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
@@ -865,6 +843,8 @@ public class TestFairScheduler {
 
   @Test
   public void testAppAdditionAndRemoval() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     ApplicationAttemptId attemptId =createAppAttemptId(1, 1);
     AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
@@ -915,6 +895,8 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     QueueManager queueManager = scheduler.getQueueManager();
@@ -947,7 +929,9 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     QueueManager queueManager = scheduler.getQueueManager();
     
@@ -974,6 +958,8 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add one big node (only care about aggregate capacity)
@@ -1031,8 +1017,10 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // Add one big node (only care about aggregate capacity)
     RMNode node1 =
         MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
@@ -1075,13 +1063,13 @@ public class TestFairScheduler {
 
   @Test (timeout = 5000)
   /**
-   * Make sure containers are chosen to be preempted in the correct order. Right
-   * now this means decreasing order of priority.
+   * Make sure containers are chosen to be preempted in the correct order.
    */
   public void testChoiceOfPreemptedContainers() throws Exception {
     conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
-    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); 
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
     
     MockClock clock = new MockClock();
     scheduler.setClock(clock);
@@ -1098,141 +1086,215 @@ public class TestFairScheduler {
     out.println("<queue name=\"queueC\">");
     out.println("<weight>.25</weight>");
     out.println("</queue>");
-    out.println("<queue name=\"queueD\">");
+    out.println("<queue name=\"default\">");
     out.println("<weight>.25</weight>");
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
-    // Create four nodes
+    // Create two nodes
     RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
             "127.0.0.1");
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
 
     RMNode node2 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
             "127.0.0.2");
     NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
     scheduler.handle(nodeEvent2);
 
-    RMNode node3 =
-        MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
-            "127.0.0.3");
-    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
-    scheduler.handle(nodeEvent3);
-
-
-    // Queue A and B each request three containers
+    // Queue A and B each request two applications
     ApplicationAttemptId app1 =
-        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+        createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
     ApplicationAttemptId app2 =
-        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
-    ApplicationAttemptId app3 =
-        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+        createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
 
+    ApplicationAttemptId app3 =
+        createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
     ApplicationAttemptId app4 =
-        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
-    ApplicationAttemptId app5 =
-        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
-    ApplicationAttemptId app6 =
-        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+        createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
+    createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
 
     scheduler.update();
 
+    scheduler.getQueueManager().getLeafQueue("queueA", true)
+        .setPolicy(SchedulingPolicy.parse("fifo"));
+    scheduler.getQueueManager().getLeafQueue("queueB", true)
+        .setPolicy(SchedulingPolicy.parse("fair"));
+
     // Sufficient node check-ins to fully schedule containers
-    for (int i = 0; i < 2; i++) {
-      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+    for (int i = 0; i < 4; i++) {
       scheduler.handle(nodeUpdate1);
-
-      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
       scheduler.handle(nodeUpdate2);
-
-      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
-      scheduler.handle(nodeUpdate3);
     }
 
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size());
-
-    // Now new requests arrive from queues C and D
-    ApplicationAttemptId app7 =
-        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
-    ApplicationAttemptId app8 =
-        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
-    ApplicationAttemptId app9 =
-        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
-    ApplicationAttemptId app10 =
-        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
-    ApplicationAttemptId app11 =
-        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
-    ApplicationAttemptId app12 =
-        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
-
-    scheduler.update();
-
-    // We should be able to claw back one container from A and B each.
-    // Make sure it is lowest priority container.
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    
-    // First verify we are adding containers to preemption list for the application
-    assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(),
-                                     scheduler.getSchedulerApp(app3).getPreemptionContainers()));
-    assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(),
-                                     scheduler.getSchedulerApp(app6).getPreemptionContainers()));
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+    // Now new requests arrive from queueC and default
+    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+    createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+    scheduler.update();
+
+    // We should be able to claw back one container from queueA and queueB each.
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+    // First verify we are adding containers to preemption list for the app.
+    // For queueA (fifo), app2 is selected.
+    // For queueB (fair), app4 is selected.
+    assertTrue("App2 should have container to be preempted",
+        !Collections.disjoint(
+            scheduler.getSchedulerApp(app2).getLiveContainers(),
+            scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+    assertTrue("App4 should have container to be preempted",
+        !Collections.disjoint(
+            scheduler.getSchedulerApp(app2).getLiveContainers(),
+            scheduler.getSchedulerApp(app2).getPreemptionContainers()));
 
     // Pretend 15 seconds have passed
     clock.tick(15);
 
     // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
 
     // At this point the containers should have been killed (since we are not simulating AM)
-    assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+    // Inside each app, containers are sorted according to their priorities.
+    // Containers with priority 4 are preempted for app2 and app4.
+    Set<RMContainer> set = new HashSet<RMContainer>();
+    for (RMContainer container :
+        scheduler.getSchedulerApp(app2).getLiveContainers()) {
+      if (container.getAllocatedPriority().getPriority() == 4) {
+        set.add(container);
+      }
+    }
+    for (RMContainer container :
+        scheduler.getSchedulerApp(app4).getLiveContainers()) {
+      if (container.getAllocatedPriority().getPriority() == 4) {
+        set.add(container);
+      }
+    }
+    assertTrue("Containers with priority=4 in app2 and app4 should be " +
+        "preempted.", set.isEmpty());
 
     // Trigger a kill by insisting we want containers back
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
 
     // Pretend 15 seconds have passed
     clock.tick(15);
 
     // We should be able to claw back another container from A and B each.
-    // Make sure it is lowest priority container.
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
-    
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    // For queueA (fifo), continue preempting from app2.
+    // For queueB (fair), even app4 has a lowest priority container with p=4, it
+    // still preempts from app3 as app3 is most over fair share.
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+    assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
     assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
     assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
 
     // Now A and B are below fair share, so preemption shouldn't do anything
-    scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
-        Resources.createResource(2 * 1024));
-    assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-    assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-    assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    assertTrue("App1 should have no container to be preempted",
+        scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
+    assertTrue("App2 should have no container to be preempted",
+        scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
+    assertTrue("App3 should have no container to be preempted",
+        scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
+    assertTrue("App4 should have no container to be preempted",
+        scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
+  }
+
+  @Test
+  public void testPreemptionIsNotDelayedToNextRound() throws Exception {
+    conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>8</weight>");
+    out.println("<queue name=\"queueA1\" />");
+    out.println("<queue name=\"queueA2\" />");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>2</weight>");
+    out.println("</queue>");
+    out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add a node of 8G
+    RMNode node1 = MockNodes.newNodeInfo(1,
+        Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Run apps in queueA.A1 and queueB
+    ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
+        "queueA.queueA1", "user1", 7, 1);
+    // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
+    ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
+        "user2", 1, 1);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+    for (int i = 0; i < 8; i++) {
+      scheduler.handle(nodeUpdate1);
+    }
+
+    // verify if the apps got the containers they requested
+    assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+
+    // Now submit an app in queueA.queueA2
+    ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
+        "queueA.queueA2", "user3", 7, 1);
+    scheduler.update();
+
+    // Let 11 sec pass
+    clock.tick(11);
+
+    scheduler.update();
+    Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
+        .getLeafQueue("queueA.queueA2", false), clock.getTime());
+    assertEquals(3277, toPreempt.getMemory());
+
+    // verify if the 3 containers required by queueA2 are preempted in the same
+    // round
+    scheduler.preemptResources(toPreempt);
+    assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
+        .size());
   }
 
   @Test (timeout = 5000)
@@ -1271,6 +1333,8 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Create four nodes
@@ -1365,9 +1429,11 @@ public class TestFairScheduler {
     assertEquals(
         1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
   }
-  
+
   @Test (timeout = 5000)
   public void testMultipleContainersWaitingForReservation() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -1410,8 +1476,10 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // Add a node
     RMNode node1 =
         MockNodes
@@ -1451,6 +1519,8 @@ public class TestFairScheduler {
   
   @Test (timeout = 5000)
   public void testReservationWhileMultiplePriorities() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     // Add a node
@@ -1466,7 +1536,7 @@ public class TestFairScheduler {
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent);
     
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId);
     assertEquals(1, app.getLiveContainers().size());
     
     ContainerId containerId = scheduler.getSchedulerApp(attId)
@@ -1531,21 +1601,25 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
         "norealuserhasthisname", 1);
     ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
         "norealuserhasthisname2", 1);
 
-    FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
     assertNotNull("The application was not allowed", app1);
-    FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
     assertNull("The application was allowed", app2);
   }
   
   @Test (timeout = 5000)
   public void testMultipleNodesSingleRackRequest() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 =
@@ -1563,8 +1637,8 @@ public class TestFairScheduler {
     scheduler.handle(nodeEvent2);
     
     ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
-    scheduler.addApplicationAttempt(appId, false);
+    scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", false);
+    scheduler.addApplicationAttempt(appId, false, false);
     
     // 1 request with 2 nodes on the same rack. another request with 1 node on
     // a different rack
@@ -1595,6 +1669,8 @@ public class TestFairScheduler {
   
   @Test (timeout = 5000)
   public void testFifoWithinQueue() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 =
@@ -1609,8 +1685,8 @@ public class TestFairScheduler {
         "user1", 2);
     ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
         "user1", 2);
-    FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
-    FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
     
     FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
     queue1.setPolicy(new FifoPolicy());
@@ -1639,6 +1715,8 @@ public class TestFairScheduler {
   @Test(timeout = 3000)
   public void testMaxAssign() throws Exception {
     conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node =
@@ -1650,7 +1728,42 @@ public class TestFairScheduler {
 
     ApplicationAttemptId attId =
         createSchedulingRequest(1024, "root.default", "user", 8);
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId);
+
+    // set maxAssign to 2: only 2 containers should be allocated
+    scheduler.maxAssign = 2;
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Incorrect number of containers allocated", 2, app
+        .getLiveContainers().size());
+
+    // set maxAssign to -1: all remaining containers should be allocated
+    scheduler.maxAssign = -1;
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Incorrect number of containers allocated", 8, app
+        .getLiveContainers().size());
+  }
+  
+  @Test(timeout = 3000)
+  public void testMaxAssignWithZeroMemoryContainers() throws Exception {
+    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+    
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    ApplicationAttemptId attId =
+        createSchedulingRequest(0, 1, "root.default", "user", 8);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId);
 
     // set maxAssign to 2: only 2 containers should be allocated
     scheduler.maxAssign = 2;
@@ -1682,6 +1795,8 @@ public class TestFairScheduler {
    */
   @Test(timeout = 5000)
   public void testAssignContainer() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     final String user = "user1";
@@ -1712,10 +1827,10 @@ public class TestFairScheduler {
     ApplicationAttemptId attId4 =
         createSchedulingRequest(1024, fifoQueue, user, 4);
 
-    FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
-    FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
-    FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
-    FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
+    FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
+    FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
 
     scheduler.getQueueManager().getLeafQueue(fifoQueue, true)
         .setPolicy(SchedulingPolicy.parse("fifo"));
@@ -1765,9 +1880,11 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     int appId = this.APP_ID++;
     String user = "usernotallow";
     String queue = "queue1";
@@ -1802,7 +1919,7 @@ public class TestFairScheduler {
 
     ApplicationAttemptId attId =
         ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
-    scheduler.addApplication(attId.getApplicationId(), queue, user);
+    scheduler.addApplication(attId.getApplicationId(), queue, user, false);
 
     numTries = 0;
     while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
@@ -1816,6 +1933,8 @@ public class TestFairScheduler {
   
   @Test
   public void testReservationThatDoesntFit() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 =
@@ -1830,7 +1949,7 @@ public class TestFairScheduler {
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(updateEvent);
     
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId);
     assertEquals(0, app.getLiveContainers().size());
     assertEquals(0, app.getReservedContainers().size());
     
@@ -1844,6 +1963,8 @@ public class TestFairScheduler {
   
   @Test
   public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
@@ -1872,6 +1993,8 @@ public class TestFairScheduler {
 
   @Test
   public void testStrictLocality() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1899,7 +2022,7 @@ public class TestFairScheduler {
     NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
 
     // no matter how many heartbeats, node2 should never get a container
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId1);
     for (int i = 0; i < 10; i++) {
       scheduler.handle(node2UpdateEvent);
       assertEquals(0, app.getLiveContainers().size());
@@ -1912,6 +2035,8 @@ public class TestFairScheduler {
   
   @Test
   public void testCancelStrictLocality() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1938,7 +2063,7 @@ public class TestFairScheduler {
     NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
 
     // no matter how many heartbeats, node2 should never get a container
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId1);
     for (int i = 0; i < 10; i++) {
       scheduler.handle(node2UpdateEvent);
       assertEquals(0, app.getLiveContainers().size());
@@ -1962,6 +2087,8 @@ public class TestFairScheduler {
    */
   @Test
   public void testReservationsStrictLocality() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1971,7 +2098,7 @@ public class TestFairScheduler {
 
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
         "user1", 0);
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId);
     
     ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
     ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
@@ -2002,6 +2129,8 @@ public class TestFairScheduler {
   
   @Test
   public void testNoMoreCpuOnNode() throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
@@ -2011,7 +2140,7 @@ public class TestFairScheduler {
     
     ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
         "user1", 2);
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId);
     scheduler.update();
 
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
@@ -2023,6 +2152,8 @@ public class TestFairScheduler {
 
   @Test
   public void testBasicDRFAssignment() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
@@ -2031,13 +2162,13 @@ public class TestFairScheduler {
 
     ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
         "user1", 2);
-    FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
     ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
         "user1", 2);
-    FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
 
     DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
-    drfPolicy.initialize(scheduler.getClusterCapacity());
+    drfPolicy.initialize(scheduler.getClusterResource());
     scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
     scheduler.update();
 
@@ -2063,6 +2194,8 @@ public class TestFairScheduler {
    */
   @Test
   public void testBasicDRFWithQueues() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
@@ -2072,16 +2205,16 @@ public class TestFairScheduler {
 
     ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
         "user1", 2);
-    FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
     ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
         "user1", 2);
-    FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
     ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
         "user1", 2);
-    FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
+    FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
     
     DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
-    drfPolicy.initialize(scheduler.getClusterCapacity());
+    drfPolicy.initialize(scheduler.getClusterResource());
     scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
     scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
     scheduler.update();
@@ -2099,6 +2232,8 @@ public class TestFairScheduler {
   
   @Test
   public void testDRFHierarchicalQueues() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
@@ -2109,22 +2244,22 @@ public class TestFairScheduler {
     ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
         "user1", 2);
     Thread.sleep(3); // so that start times will be different
-    FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
     ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
         "user1", 2);
     Thread.sleep(3); // so that start times will be different
-    FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
     ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
         "user1", 2);
     Thread.sleep(3); // so that start times will be different
-    FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
+    FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
     ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
         "user1", 2);
     Thread.sleep(3); // so that start times will be different
-    FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4);
+    FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4);
     
     DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
-    drfPolicy.initialize(scheduler.getClusterCapacity());
+    drfPolicy.initialize(scheduler.getClusterResource());
     scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
     scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
     scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
@@ -2167,7 +2302,9 @@ public class TestFairScheduler {
   public void testHostPortNodeName() throws Exception {
     conf.setBoolean(YarnConfiguration
         .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
-    scheduler.reinitialize(conf, 
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf,
         resourceManager.getRMContext());
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
         1, "127.0.0.1", 1);
@@ -2201,7 +2338,7 @@ public class TestFairScheduler {
         NodeUpdateSchedulerEvent(node2);
 
     // no matter how many heartbeats, node2 should never get a container  
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId1);
     for (int i = 0; i < 10; i++) {
       scheduler.handle(node2UpdateEvent);
       assertEquals(0, app.getLiveContainers().size());
@@ -2213,14 +2350,14 @@ public class TestFairScheduler {
   }
 
   private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
-    FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+    FSAppAttempt app = scheduler.getSchedulerApp(attId);
     FSLeafQueue queue = app.getQueue();
-    Collection<AppSchedulable> runnableApps =
+    Collection<FSAppAttempt> runnableApps =
         queue.getRunnableAppSchedulables();
-    Collection<AppSchedulable> nonRunnableApps =
+    Collection<FSAppAttempt> nonRunnableApps =
         queue.getNonRunnableAppSchedulables();
-    assertEquals(runnable, runnableApps.contains(app.getAppSchedulable()));
-    assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable()));
+    assertEquals(runnable, runnableApps.contains(app));
+    assertEquals(!runnable, nonRunnableApps.contains(app));
   }
   
   private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue,
@@ -2247,9 +2384,11 @@ public class TestFairScheduler {
     out.println("</user>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // exceeds no limits
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
     verifyAppRunnable(attId1, true);
@@ -2283,6 +2422,284 @@ public class TestFairScheduler {
   }
   
   @Test
+  public void testQueueMaxAMShare() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queue1\">");
+    out.println("<maxAMShare>0.2</maxAMShare>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
+            0, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+    scheduler.update();
+
+    FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
+    assertEquals("Queue queue1's fair share should be 0", 0, queue1
+        .getFairShare().getMemory());
+
+    createSchedulingRequest(1 * 1024, "root.default", "user1");
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    Resource amResource1 = Resource.newInstance(1024, 1);
+    Resource amResource2 = Resource.newInstance(2048, 2);
+    Resource amResource3 = Resource.newInstance(1860, 2);
+    int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
+    // Exceeds no limits
+    ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(attId1, "queue1", "user1", amResource1);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application1's AM requests 1024 MB memory",
+        1024, app1.getAMResource().getMemory());
+    assertEquals("Application1's AM should be running",
+        1, app1.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 1024 MB memory",
+        1024, queue1.getAmResourceUsage().getMemory());
+
+    // Exceeds no limits
+    ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
+    createApplicationWithAMResource(attId2, "queue1", "user1", amResource1);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application2's AM requests 1024 MB memory",
+        1024, app2.getAMResource().getMemory());
+    assertEquals("Application2's AM should be running",
+        1, app2.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Exceeds queue limit
+    ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
+    createApplicationWithAMResource(attId3, "queue1", "user1", amResource1);
+    createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3);
+    FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application3's AM requests 1024 MB memory",
+        1024, app3.getAMResource().getMemory());
+    assertEquals("Application3's AM should not be running",
+        0, app3.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Still can run non-AM container
+    createSchedulingRequestExistingApplication(1024, 1, attId1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application1 should have two running containers",
+        2, app1.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Remove app1, app3's AM should become running
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
+        new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false);
+    scheduler.update();
+    scheduler.handle(appRemovedEvent1);
+    scheduler.handle(updateEvent);
+    assertEquals("Application1's AM should be finished",
+        0, app1.getLiveContainers().size());
+    assertEquals("Application3's AM should be running",
+        1, app3.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Exceeds queue limit
+    ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
+    createApplicationWithAMResource(attId4, "queue1", "user1", amResource2);
+    createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4);
+    FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application4's AM requests 2048 MB memory",
+        2048, app4.getAMResource().getMemory());
+    assertEquals("Application4's AM should not be running",
+        0, app4.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Exceeds queue limit
+    ApplicationAttemptId attId5 = createAppAttemptId(5, 1);
+    createApplicationWithAMResource(attId5, "queue1", "user1", amResource2);
+    createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5);
+    FSAppAttempt app5 = scheduler.getSchedulerApp(attId5);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application5's AM requests 2048 MB memory",
+        2048, app5.getAMResource().getMemory());
+    assertEquals("Application5's AM should not be running",
+        0, app5.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Remove un-running app doesn't affect others
+    AppAttemptRemovedSchedulerEvent appRemovedEvent4 =
+        new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.KILLED, false);
+    scheduler.handle(appRemovedEvent4);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application5's AM should not be running",
+        0, app5.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Remove app2 and app3, app5's AM should become running
+    AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
+        new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
+    AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
+        new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false);
+    scheduler.handle(appRemovedEvent2);
+    scheduler.handle(appRemovedEvent3);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application2's AM should be finished",
+        0, app2.getLiveContainers().size());
+    assertEquals("Application3's AM should be finished",
+        0, app3.getLiveContainers().size());
+    assertEquals("Application5's AM should be running",
+        1, app5.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Check amResource normalization
+    ApplicationAttemptId attId6 = createAppAttemptId(6, 1);
+    createApplicationWithAMResource(attId6, "queue1", "user1", amResource3);
+    createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6);
+    FSAppAttempt app6 = scheduler.getSchedulerApp(attId6);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application6's AM should not be running",
+        0, app6.getLiveContainers().size());
+    assertEquals("Application6's AM requests 2048 MB memory",
+        2048, app6.getAMResource().getMemory());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Remove all apps
+    AppAttemptRemovedSchedulerEvent appRemovedEvent5 =
+        new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.FINISHED, false);
+    AppAttemptRemovedSchedulerEvent appRemovedEvent6 =
+        new AppAttemptRemovedSchedulerEvent(attId6, RMAppAttemptState.FINISHED, false);
+    scheduler.handle(appRemovedEvent5);
+    scheduler.handle(appRemovedEvent6);
+    scheduler.update();
+    assertEquals("Queue1's AM resource usage should be 0",
+        0, queue1.getAmResourceUsage().getMemory());
+  }
+
+  @Test
+  public void testQueueMaxAMShareDefault() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queue1\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queue2\">");
+    out.println("<maxAMShare>1.0</maxAMShare>");
+    out.println("</queue>");
+    out.println("<queue name=\"queue3\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queue4\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queue5\">");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(8192, 20),
+            0, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+    scheduler.update();
+
+    FSLeafQueue queue1 =
+        scheduler.getQueueManager().getLeafQueue("queue1", true);
+    assertEquals("Queue queue1's fair share should be 0", 0, queue1
+        .getFairShare().getMemory());
+    FSLeafQueue queue2 =
+        scheduler.getQueueManager().getLeafQueue("queue2", true);
+    assertEquals("Queue queue2's fair share should be 0", 0, queue2
+        .getFairShare().getMemory());
+    FSLeafQueue queue3 =
+        scheduler.getQueueManager().getLeafQueue("queue3", true);
+    assertEquals("Queue queue3's fair share should be 0", 0, queue3
+        .getFairShare().getMemory());
+    FSLeafQueue queue4 =
+        scheduler.getQueueManager().getLeafQueue("queue4", true);
+    assertEquals("Queue queue4's fair share should be 0", 0, queue4
+        .getFairShare().getMemory());
+    FSLeafQueue queue5 =
+        scheduler.getQueueManager().getLeafQueue("queue5", true);
+    assertEquals("Queue queue5's fair share should be 0", 0, queue5
+        .getFairShare().getMemory());
+
+    List<String> queues = Arrays.asList("root.default", "root.queue3",
+        "root.queue4", "root.queue5");
+    for (String queue : queues) {
+      createSchedulingRequest(1 * 1024, queue, "user1");
+      scheduler.update();
+      scheduler.handle(updateEvent);
+    }
+
+    Resource amResource1 = Resource.newInstance(2048, 1);
+    int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
+
+    // Exceeds queue limit, but default maxAMShare is -1.0 so it doesn't matter
+    ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(attId1, "queue1", "test1", amResource1);
+    createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application1's AM requests 2048 MB memory",
+        2048, app1.getAMResource().getMemory());
+    assertEquals("Application1's AM should be running",
+        1, app1.getLiveContainers().size());
+    assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+        2048, queue1.getAmResourceUsage().getMemory());
+
+    // Exceeds queue limit, and maxAMShare is 1.0
+    ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
+    createApplicationWithAMResource(attId2, "queue2", "test1", amResource1);
+    createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2);
+    FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    assertEquals("Application2's AM requests 2048 MB memory",
+        2048, app2.getAMResource().getMemory());
+    assertEquals("Application2's AM should not be running",
+        0, app2.getLiveContainers().size());
+    assertEquals("Queue2's AM resource usage should be 0 MB memory",
+        0, queue2.getAmResourceUsage().getMemory());
+  }
+
+  @Test
   public void testMaxRunningAppsHierarchicalQueues() throws Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     MockClock clock = new MockClock();
@@ -2301,9 +2718,11 @@ public class TestFairScheduler {
     out.println("</queue>");
     out.println("</allocations>");
     out.close();
-    
+
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
+
     // exceeds no limits
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
     verifyAppRunnable(attId1, true);
@@ -2363,6 +2782,9 @@ public class TestFairScheduler {
     Configuration conf = createConfiguration();
     conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
             true);
+    fs.setRMContext(resourceManager.getRMContext());
+    fs.init(conf);
+    fs.start();
     fs.reinitialize(conf, resourceManager.getRMContext());
     Assert.assertTrue("Continuous scheduling should be enabled.",
             fs.isContinuousSchedulingEnabled());
@@ -2380,14 +2802,14 @@ public class TestFairScheduler {
     fs.handle(nodeEvent2);
 
     // available resource
-    Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
-    Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
+    Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024);
+    Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16);
 
     // send application request
     ApplicationAttemptId appAttemptId =
             createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
-    fs.addApplicationAttempt(appAttemptId, false);
+    fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
+    fs.addApplicationAttempt(appAttemptId, false, false);
     List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
     ResourceRequest request =
             createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
@@ -2398,7 +2820,7 @@ public class TestFairScheduler {
     // at least one pass
     Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
 
-    FSSchedulerApp app = fs.getSchedulerApp(appAttemptId);
+    FSAppAttempt app = fs.getSchedulerApp(appAttemptId);
     // Wait until app gets resources.
     while (app.getCurrentConsumption().equals(Resources.none())) { }
 
@@ -2429,7 +2851,43 @@ public class TestFairScheduler {
     Assert.assertEquals(2, nodes.size());
   }
 
-  
+  @Test
+  public void testContinuousSchedulingWithNodeRemoved() throws Exception {
+    // Disable continuous scheduling, will invoke continuous scheduling once manually
+    scheduler.init(conf);
+    scheduler.start();
+    Assert.assertTrue("Continuous scheduling should be disabled.",
+        !scheduler.isContinuousSchedulingEnabled());
+
+    // Add two nodes
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    RMNode node2 =
+        MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
+            "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+    Assert.assertEquals("We should have two alive nodes.",
+        2, scheduler.getNumClusterNodes());
+
+    // Remove one node
+    NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
+    scheduler.handle(removeNode1);
+    Assert.assertEquals("We should only have one alive node.",
+        1, scheduler.getNumClusterNodes());
+
+    // Invoke the continuous scheduling once
+    try {
+      scheduler.continuousSchedulingAttempt();
+    } catch (Exception e) {
+      fail("Exception happened when doing continuous scheduling. " +
+        e.toString());
+    }
+  }
+
   @Test
   public void testDontAllowUndeclaredPools() throws Exception{
     conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
@@ -2443,6 +2901,8 @@ public class TestFairScheduler {
     out.println("</allocations>");
     out.close();
 
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     QueueManager queueManager = scheduler.getQueueManager();
     
@@ -2470,9 +2930,123 @@ public class TestFairScheduler {
     assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
   }
 
+  @Test
+  public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured()
+      throws IOException {
+    // This test verifies if default rule in queue placement policy
+    // initializes properly when policy is not configured and
+    // undeclared pools is not allowed.
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
+
+    // Create an alloc file with no queue placement policy
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    List<QueuePlacementRule> rules = scheduler.allocConf.placementPolicy
+        .getRules();
+
+    for (QueuePlacementRule rule : rules) {
+      if (rule instanceof Default) {
+        Default defaultRule = (Default) rule;
+        assertNotNull(defaultRule.defaultQueueName);
+      }
+    }
+  }
+    
+  @Test(timeout=5000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+    
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    Priority priority = Priority.newInstance(20);
+    String host = "127.0.0.1";
+    int GB = 1024;
+
+    // Create Node and raised Node Added event
+    RMNode node = MockNodes.newNodeInfo(1,
+        Resources.createResource(16 * 1024, 4), 0, host);
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    // Create 3 container requests and place it in ask
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+        priority.getPriority(), 1, true);
+    ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+        node.getRackName(), priority.getPriority(), 1, true);
+    ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+        ResourceRequest.ANY, priority.getPriority(), 1, true);
+    ask.add(nodeLocalRequest);
+    ask.add(rackLocalRequest);
+    ask.add(offRackRequest);
+
+    // Create Request and update
+    ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+        "user1", ask);
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeUpdate);
+
+    assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+        .size());
+    FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
+
+    // ResourceRequest will be empty once NodeUpdate is completed
+    Assert.assertNull(app.getResourceRequest(priority, host));
+
+    ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1);
+    RMContainer rmContainer = app.getRMContainer(containerId1);
+
+    // Create a preempt event and register for preemption
+    scheduler.warnOrKillContainer(rmContainer);
+    
+    // Wait for few clock ticks
+    clock.tick(5);
+    
+    // preempt now
+    scheduler.warnOrKillContainer(rmContainer);
+
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    // Once recovered, resource request will be present again in app
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      Assert.assertEquals(1,
+          app.getResourceRequest(priority, request.getResourceName())
+              .getNumContainers());
+    }
+
+    // Send node heartbeat
+    scheduler.update();
+    scheduler.handle(nodeUpdate);
+
+    List<Container> containers = scheduler.allocate(appAttemptId,
+        Collections.<ResourceRequest> emptyList(),
+        Collections.<ContainerId> emptyList(), null, null).getContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
+  
   @SuppressWarnings("resource")
   @Test
   public void testBlacklistNodes() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     final int GB = 1024;
@@ -2486,7 +3060,7 @@ public class TestFairScheduler {
 
     ApplicationAttemptId appAttemptId =
         createSchedulingRequest(GB, "root.default", "user", 1);
-    FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
+    FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
 
     // Verify the blacklist can be updated independent of requesting containers
     scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
@@ -2525,6 +3099,8 @@ public class TestFairScheduler {
   
   @Test
   public void testGetAppsInQueue() throws Exception {
+    scheduler.init(conf);
+    scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId appAttId1 =
@@ -2561,16 +3137,19 @@ public class TestFairScheduler {
 
   @Test

[... 222 lines stripped ...]