You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/07/13 02:43:02 UTC

svn commit: r1361020 [3/3] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ hadoop-yarn/hadoop-yarn-server/had...

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,992 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFairScheduler {
+
+  private class MockClock implements Clock {
+    private long time = 0;
+    @Override
+    public long getTime() {
+      return time;
+    }
+
+    public void tick(int seconds) {
+      time = time + seconds * 1000;
+    }
+
+  }
+
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "/tmp")).getAbsolutePath();
+
+  final static String ALLOC_FILE = new File(TEST_DIR,
+      "test-queues").getAbsolutePath();
+
+  private FairScheduler scheduler;
+  private ResourceManager resourceManager;
+  private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+  private int APP_ID = 1; // Incrementing counter for schedling apps
+  private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
+
+  // HELPER METHODS
+  @Before
+  public void setUp() throws IOException {
+    scheduler = new FairScheduler();
+    Configuration conf = new Configuration();
+    // All tests assume only one assignment per node update
+    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
+    Store store = StoreFactory.getStore(conf);
+    resourceManager = new ResourceManager(store);
+    resourceManager.init(conf);
+    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+  }
+
+  @After
+  public void tearDown() {
+    scheduler = null;
+    resourceManager = null;
+  }
+
+  private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
+    ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
+    appIdImpl.setId(appId);
+    attId.setAttemptId(attemptId);
+    attId.setApplicationId(appIdImpl);
+    return attId;
+  }
+
+
+  private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers) {
+    ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
+    request.setCapability(Resources.createResource(memory));
+    request.setHostName(host);
+    request.setNumContainers(numContainers);
+    Priority prio = recordFactory.newRecordInstance(Priority.class);
+    prio.setPriority(priority);
+    request.setPriority(prio);
+    return request;
+  }
+
+  /**
+   * Creates a single container priority-1 request and submits to
+   * scheduler.
+   */
+  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId) {
+    return createSchedulingRequest(memory, queueId, userId, 1);
+  }
+
+  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers) {
+    return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
+  }
+
+  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers, int priority) {
+    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+    scheduler.addApplication(id, queueId, userId);
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest request = createResourceRequest(memory, "*", priority, numContainers);
+    ask.add(request);
+    scheduler.allocate(id, ask,  new ArrayList<ContainerId>());
+    return id;
+  }
+
+  // TESTS
+
+  @Test
+  public void testAggregateCapacityTracking() throws Exception {
+    // Add a node
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    assertEquals(1024, scheduler.getClusterCapacity().getMemory());
+
+    // Add another node
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+    assertEquals(1536, scheduler.getClusterCapacity().getMemory());
+
+    // Remove the first node
+    NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent3);
+    assertEquals(512, scheduler.getClusterCapacity().getMemory());
+  }
+
+  @Test
+  public void testSimpleFairShareCalculation() {
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Have two queues which want entire cluster capacity
+    createSchedulingRequest(10 * 1024, "queue1", "user1");
+    createSchedulingRequest(10 * 1024, "queue2", "user1");
+
+    scheduler.update();
+
+    Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+    assertEquals(3, queues.size());
+
+    for (FSQueue p : queues) {
+      if (p.getName() != "default") {
+        assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory());
+      }
+    }
+  }
+
+  @Test
+  public void testSimpleContainerAllocation() {
+    // Add a node
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Add another node
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    createSchedulingRequest(512, "queue1", "user1", 2);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
+      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    scheduler.handle(updateEvent);
+
+    assertEquals(512, scheduler.getQueueManager().getQueue("queue1").
+        getQueueSchedulable().getResourceUsage().getMemory());
+
+    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
+        new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    scheduler.handle(updateEvent2);
+
+    assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
+      getQueueSchedulable().getResourceUsage().getMemory());
+  }
+
+  @Test
+  public void testSimpleContainerReservation() throws InterruptedException {
+    // Add a node
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Queue 1 requests full capacity of node
+    createSchedulingRequest(1024, "queue1", "user1", 1);
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
+      new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 1 is allocated app capacity
+    assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
+        getQueueSchedulable().getResourceUsage().getMemory());
+
+    // Now queue 2 requests likewise
+    ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 2 is waiting with a reservation
+    assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+      getQueueSchedulable().getResourceUsage().getMemory());
+    assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
+
+    // Now another node checks in with capacity
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
+        new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+    scheduler.handle(nodeEvent2);
+    scheduler.handle(updateEvent2);
+
+    // Make sure this goes to queue 2
+    assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+        getQueueSchedulable().getResourceUsage().getMemory());
+
+    // The old reservation should still be there...
+    assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
+    // ... but it should disappear when we update the first node.
+    scheduler.handle(updateEvent);
+    assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory());
+
+  }
+
+  @Test
+  public void testUserAsDefaultQueue() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+    AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
+        createAppAttemptId(1, 1), "default", "user1");
+    scheduler.handle(appAddedEvent);
+    assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
+    assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size());
+
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+    AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
+        createAppAttemptId(2, 1), "default", "user2");
+    scheduler.handle(appAddedEvent2);
+    assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
+    assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size());
+    assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size());
+  }
+
+  @Test
+  public void testFairShareWithMinAlloc() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<minResources>1024</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<minResources>2048</minResources>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    createSchedulingRequest(2 * 1024, "queueA", "user1");
+    createSchedulingRequest(2 * 1024, "queueB", "user1");
+
+    scheduler.update();
+
+    Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+    assertEquals(3, queues.size());
+
+    for (FSQueue p : queues) {
+      if (p.getName().equals("queueA")) {
+        assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory());
+      }
+      else if (p.getName().equals("queueB")) {
+        assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory());
+      }
+    }
+
+  }
+
+  /**
+   * Make allocation requests and ensure they are reflected in queue demand.
+   */
+  @Test
+  public void testQueueDemandCalculation() throws Exception {
+    ApplicationAttemptId id11 = createAppAttemptId(1, 1);
+    scheduler.addApplication(id11, "queue1", "user1");
+    ApplicationAttemptId id21 = createAppAttemptId(2, 1);
+    scheduler.addApplication(id21, "queue2", "user1");
+    ApplicationAttemptId id22 = createAppAttemptId(2, 2);
+    scheduler.addApplication(id22, "queue2", "user1");
+
+    // First ask, queue1 requests 1024
+    List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
+    ResourceRequest request1 = createResourceRequest(1024, "*", 1, 1);
+    ask1.add(request1);
+    scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
+
+    // Second ask, queue2 requests 1024 + (2 * 512)
+    List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
+    ResourceRequest request2 = createResourceRequest(1024, "foo", 1, 1);
+    ResourceRequest request3 = createResourceRequest(512, "bar", 1, 2);
+    ask2.add(request2);
+    ask2.add(request3);
+    scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
+
+    // Third ask, queue2 requests 1024
+    List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
+    ResourceRequest request4 = createResourceRequest(1024, "*", 1, 1);
+    ask3.add(request4);
+    scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
+
+    scheduler.update();
+
+    assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").getQueueSchedulable().getDemand().getMemory());
+    assertEquals(1024 + 1024 + (2 * 512), scheduler.getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand().getMemory());
+
+  }
+
+  @Test
+  public void testAppAdditionAndRemoval() throws Exception {
+    AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
+        createAppAttemptId(1, 1), "default", "user1");
+    scheduler.handle(appAddedEvent1);
+
+    // Scheduler should have one queue (the default)
+    assertEquals(1, scheduler.getQueueManager().getQueues().size());
+
+    // That queue should have one app
+    assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size());
+
+    AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
+        createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
+
+    // Now remove app
+    scheduler.handle(appRemovedEvent1);
+
+    // Default queue should have no apps
+    assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size());
+  }
+
+  @Test
+  public void testAllocationFileParsing() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give queue A a minimum of 1024 M
+    out.println("<queue name=\"queueA\">");
+    out.println("<minResources>1024</minResources>");
+    out.println("</queue>");
+    // Give queue B a minimum of 2048 M
+    out.println("<queue name=\"queueB\">");
+    out.println("<minResources>2048</minResources>");
+    out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
+    out.println("</queue>");
+    // Give queue C no minimum
+    out.println("<queue name=\"queueC\">");
+    out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
+    out.println("</queue>");
+    // Give queue D a limit of 3 running apps
+    out.println("<queue name=\"queueD\">");
+    out.println("<maxRunningApps>3</maxRunningApps>");
+    out.println("</queue>");
+    // Give queue E a preemption timeout of one minute
+    out.println("<queue name=\"queueE\">");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</queue>");
+    // Set default limit of apps per queue to 15
+    out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
+    // Set default limit of apps per user to 5
+    out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
+    // Give user1 a limit of 10 jobs
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningApps>10</maxRunningApps>");
+    out.println("</user>");
+    // Set default min share preemption timeout to 2 minutes
+    out.println("<defaultMinSharePreemptionTimeout>120"
+        + "</defaultMinSharePreemptionTimeout>");
+    // Set fair share preemption timeout to 5 minutes
+    out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+
+    assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+
+    assertEquals(Resources.createResource(1024),
+        queueManager.getMinResources("queueA"));
+    assertEquals(Resources.createResource(2048),
+        queueManager.getMinResources("queueB"));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources("queueC"));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources("queueD"));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources("queueE"));
+
+    assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
+    assertEquals(15, queueManager.getQueueMaxApps("queueA"));
+    assertEquals(15, queueManager.getQueueMaxApps("queueB"));
+    assertEquals(15, queueManager.getQueueMaxApps("queueC"));
+    assertEquals(3, queueManager.getQueueMaxApps("queueD"));
+    assertEquals(15, queueManager.getQueueMaxApps("queueE"));
+    assertEquals(10, queueManager.getUserMaxApps("user1"));
+    assertEquals(5, queueManager.getUserMaxApps("user2"));
+
+    // Unspecified queues should get default ACL
+    Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA");
+    assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
+    assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+    assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+    assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+    // Queue B ACL
+    Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
+    assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
+    assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+
+    // Queue c ACL
+    Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
+    assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+    assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
+        YarnConfiguration.DEFAULT_QUEUE_NAME));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+    assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
+    assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
+  }
+
+  @Test
+  public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give queue A a minimum of 1024 M
+    out.println("<pool name=\"queueA\">");
+    out.println("<minResources>1024</minResources>");
+    out.println("</pool>");
+    // Give queue B a minimum of 2048 M
+    out.println("<pool name=\"queueB\">");
+    out.println("<minResources>2048</minResources>");
+    out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
+    out.println("</pool>");
+    // Give queue C no minimum
+    out.println("<pool name=\"queueC\">");
+    out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
+    out.println("</pool>");
+    // Give queue D a limit of 3 running apps
+    out.println("<pool name=\"queueD\">");
+    out.println("<maxRunningApps>3</maxRunningApps>");
+    out.println("</pool>");
+    // Give queue E a preemption timeout of one minute
+    out.println("<pool name=\"queueE\">");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    // Set default limit of apps per queue to 15
+    out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
+    // Set default limit of apps per user to 5
+    out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
+    // Give user1 a limit of 10 jobs
+    out.println("<user name=\"user1\">");
+    out.println("<maxRunningApps>10</maxRunningApps>");
+    out.println("</user>");
+    // Set default min share preemption timeout to 2 minutes
+    out.println("<defaultMinSharePreemptionTimeout>120"
+        + "</defaultMinSharePreemptionTimeout>");
+    // Set fair share preemption timeout to 5 minutes
+    out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+
+    assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+
+    assertEquals(Resources.createResource(1024),
+        queueManager.getMinResources("queueA"));
+    assertEquals(Resources.createResource(2048),
+        queueManager.getMinResources("queueB"));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources("queueC"));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources("queueD"));
+    assertEquals(Resources.createResource(0),
+        queueManager.getMinResources("queueE"));
+
+    assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
+    assertEquals(15, queueManager.getQueueMaxApps("queueA"));
+    assertEquals(15, queueManager.getQueueMaxApps("queueB"));
+    assertEquals(15, queueManager.getQueueMaxApps("queueC"));
+    assertEquals(3, queueManager.getQueueMaxApps("queueD"));
+    assertEquals(15, queueManager.getQueueMaxApps("queueE"));
+    assertEquals(10, queueManager.getUserMaxApps("user1"));
+    assertEquals(5, queueManager.getUserMaxApps("user2"));
+
+    // Unspecified queues should get default ACL
+    Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA");
+    assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
+    assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+    assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+    assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+    // Queue B ACL
+    Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
+    assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
+    assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+
+    // Queue c ACL
+    Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
+    assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+    assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
+        YarnConfiguration.DEFAULT_QUEUE_NAME));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
+    assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+    assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
+    assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
+  }
+
+  @Test
+  public void testIsStarvedForMinShare() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<minResources>2048</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<minResources>2048</minResources>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Queue A wants 3 * 1024. Node update gives this all to A
+    createSchedulingRequest(3 * 1024, "queueA", "user1");
+    scheduler.update();
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
+        new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+    scheduler.handle(nodeEvent2);
+
+    // Queue B arrives and wants 1 * 1024
+    createSchedulingRequest(1 * 1024, "queueB", "user1");
+    scheduler.update();
+    Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+    assertEquals(3, queues.size());
+
+    // Queue A should be above min share, B below.
+    for (FSQueue p : queues) {
+      if (p.getName().equals("queueA")) {
+        assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
+      }
+      else if (p.getName().equals("queueB")) {
+        assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
+      }
+    }
+
+    // Node checks in again, should allocate for B
+    scheduler.handle(nodeEvent2);
+    // Now B should have min share ( = demand here)
+    for (FSQueue p : queues) {
+      if (p.getName().equals("queueB")) {
+        assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
+      }
+    }
+  }
+
+  @Test
+  public void testIsStarvedForFairShare() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.75</weight>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Queue A wants 3 * 1024. Node update gives this all to A
+    createSchedulingRequest(3 * 1024, "queueA", "user1");
+    scheduler.update();
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
+        new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+    scheduler.handle(nodeEvent2);
+
+    // Queue B arrives and wants 1 * 1024
+    createSchedulingRequest(1 * 1024, "queueB", "user1");
+    scheduler.update();
+    Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+    assertEquals(3, queues.size());
+
+    // Queue A should be above fair share, B below.
+    for (FSQueue p : queues) {
+      if (p.getName().equals("queueA")) {
+        assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
+      }
+      else if (p.getName().equals("queueB")) {
+        assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
+      }
+    }
+
+    // Node checks in again, should allocate for B
+    scheduler.handle(nodeEvent2);
+    // B should not be starved for fair share, since entire demand is
+    // satisfied.
+    for (FSQueue p : queues) {
+      if (p.getName().equals("queueB")) {
+        assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
+      }
+    }
+  }
+
+  @Test
+  /**
+   * Make sure containers are chosen to be preempted in the correct order. Right
+   * now this means decreasing order of priority.
+   */
+  public void testChoiceOfPreemptedContainers() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+
+    // Create four nodes
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+
+    // Queue A and B each request three containers
+    ApplicationAttemptId app1 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+    ApplicationAttemptId app2 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+    ApplicationAttemptId app3 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+    ApplicationAttemptId app4 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+    ApplicationAttemptId app5 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+    ApplicationAttemptId app6 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 2; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
+          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
+          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
+          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      scheduler.handle(nodeUpdate3);
+    }
+
+    assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size());
+
+    // Now new requests arrive from queues C and D
+    ApplicationAttemptId app7 =
+        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+    ApplicationAttemptId app8 =
+        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+    ApplicationAttemptId app9 =
+        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+    ApplicationAttemptId app10 =
+        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
+    ApplicationAttemptId app11 =
+        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
+    ApplicationAttemptId app12 =
+        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
+
+    scheduler.update();
+
+    // We should be able to claw back one container from A and B each.
+    // Make sure it is lowest priority container.
+    scheduler.preemptResources(scheduler.getQueueSchedulables(),
+        Resources.createResource(2 * 1024));
+    assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+
+    // We should be able to claw back another container from A and B each.
+    // Make sure it is lowest priority container.
+    scheduler.preemptResources(scheduler.getQueueSchedulables(),
+        Resources.createResource(2 * 1024));
+    assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+
+    // Now A and B are below fair share, so preemption shouldn't do anything
+    scheduler.preemptResources(scheduler.getQueueSchedulables(),
+        Resources.createResource(2 * 1024));
+    assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+    assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
+    assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+  }
+
+  @Test
+  /**
+   * Tests the timing of decision to preempt tasks.
+   */
+  public void testPreemptionDecision() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+    scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024</minResources>");
+    out.println("</queue>");
+    out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    QueueManager queueManager = scheduler.getQueueManager();
+    queueManager.initialize();
+
+    // Create four nodes
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+
+    // Queue A and B each request three containers
+    ApplicationAttemptId app1 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+    ApplicationAttemptId app2 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+    ApplicationAttemptId app3 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+    ApplicationAttemptId app4 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+    ApplicationAttemptId app5 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+    ApplicationAttemptId app6 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 2; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
+          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
+          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
+          new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+      scheduler.handle(nodeUpdate3);
+    }
+
+    // Now new requests arrive from queues C and D
+    ApplicationAttemptId app7 =
+        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+    ApplicationAttemptId app8 =
+        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+    ApplicationAttemptId app9 =
+        createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+    ApplicationAttemptId app10 =
+        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
+    ApplicationAttemptId app11 =
+        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
+    ApplicationAttemptId app12 =
+        createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
+
+    scheduler.update();
+
+    FSQueueSchedulable schedC =
+        scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable();
+    FSQueueSchedulable schedD =
+        scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable();
+
+    assertTrue(Resources.equals(
+        Resources.none(), scheduler.resToPreempt(schedC, clock.getTime())));
+    assertTrue(Resources.equals(
+        Resources.none(), scheduler.resToPreempt(schedD, clock.getTime())));
+    // After minSharePreemptionTime has passed, they should want to preempt min
+    // share.
+    clock.tick(6);
+    assertTrue(Resources.equals(
+        Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
+    assertTrue(Resources.equals(
+        Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
+
+    // After fairSharePreemptionTime has passed, they should want to preempt
+    // fair share.
+    scheduler.update();
+    clock.tick(6);
+    assertTrue(Resources.equals(
+        Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
+    assertTrue(Resources.equals(
+        Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Fri Jul 13 00:43:01 2012
@@ -0,0 +1,179 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~   http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+  ---
+  Hadoop Map Reduce Next Generation-${project.version} - Fair Scheduler
+  ---
+  ---
+  ${maven.build.timestamp}
+
+Hadoop MapReduce Next Generation - Fair Scheduler
+
+  \[ {{{./index.html}Go Back}} \]
+
+%{toc|section=1|fromDepth=0}
+
+* {Purpose} 
+
+  This document describes the <<<FairScheduler>>>, a pluggable scheduler for Hadoop 
+  which provides a way to share large clusters. <<NOTE:>> The Fair Scheduler 
+  implementation is currently under development and should be considered experimental.
+
+* {Introduction}
+
+  Fair scheduling is a method of assigning resources to applications such that 
+  all apps get, on average, an equal share of resources over time. 
+  Hadoop NextGen is capable of scheduling multiple resource types, such as 
+  Memory and CPU. Currently only memory is supported, so a "cluster share" is 
+  a proportion of aggregate memory in the cluster. When there is a single app 
+  running, that app uses the entire cluster. When other apps are submitted, 
+  resources that free up are assigned to the new apps, so that each app gets 
+  roughly the same amount of resources. Unlike the default Hadoop scheduler, 
+  which forms a queue of apps, this lets short apps finish in reasonable time
+  while not starving long-lived apps. It is also a reasonable way to share a 
+  cluster between a number of users. Finally, fair sharing can also work with 
+  app priorities - the priorities are used as weights to determine the 
+  fraction of total resources that each app should get.
+
+  The scheduler organizes apps further into "queues", and shares resources 
+  fairly between these queues. By default, all users share a single queue, 
+  called “default”. If an app specifically lists a queue in a container 
+  resource request, the request is submitted to that queue. It is also 
+  possible to assign queues based on the user name included with the request 
+  through configuration. Within each queue, fair sharing is used to share 
+  capacity between the running apps. queues can also be given weights to share 
+  the cluster non-proportionally in the config file.
+
+  In addition to providing fair sharing, the Fair Scheduler allows assigning 
+  guaranteed minimum shares to queues, which is useful for ensuring that 
+  certain users, groups or production applications always get sufficient 
+  resources. When a queue contains apps, it gets at least its minimum share, 
+  but when the queue does not need its full guaranteed share, the excess is 
+  split between other running apps. This lets the scheduler guarantee capacity 
+  for queues while utilizing resources efficiently when these queues don't
+  contain applications.
+
+  The Fair Scheduler lets all apps run by default, but it is also possible to 
+  limit the number of running apps per user and per queue through the config 
+  file. This can be useful when a user must submit hundreds of apps at once, 
+  or in general to improve performance if running too many apps at once would 
+  cause too much intermediate data to be created or too much context-switching.
+  Limiting the apps does not cause any subsequently submitted apps to fail, 
+  only to wait in the scheduler's queue until some of the user's earlier apps 
+  finish. apps to run from each user/queue are chosen in order of priority and 
+  then submit time, as in the default FIFO scheduler in Hadoop.
+
+  Certain add-ons are not yet supported which existed in the original (MR1) 
+  Fair Scheduler. Among them, is the use of a custom policies governing 
+  priority “boosting” over  certain apps. 
+
+* {Installation}
+
+  To use the Fair Scheduler first assign the appropriate scheduler class in 
+  yarn-site.xml:
+
+------
+<property>
+  <name>yarn.resourcemanager.scheduler.class</name>
+  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
+</property>
+------
+
+* {Configuration}
+
+  Customizing the Fair Scheduler typically involves altering two files. First, 
+  scheduler-wide options can be set by adding configuration properties in the 
+  fair-scheduler.xml file in your existing configuration directory. Second, in 
+  most cases users will want to create a manifest file listing which queues 
+  exist and their respective weights and capacities. The location of this file 
+  is flexible - but it must be declared in fair-scheduler.xml. 
+
+ * <<<yarn.scheduler.fair.allocation.file>>>
+
+   * Path to allocation file. An allocation file is an XML manifest describing
+     queues and their properties, in addition to certain policy defaults. This file
+     must be in XML format as described in the next section.
+
+ * <<<yarn.scheduler.fair.minimum-allocation-mb>>>
+
+    * The smallest container size the scheduler can allocate, in MB of memory.
+
+ * <<<yarn.scheduler.fair.minimum-allocation-mb>>>
+
+    * The largest container the scheduler can allocate, in MB of memory.
+
+ * <<<yarn.scheduler.fair.user-as-default-queue>>>
+
+    * Whether to use the username associated with the allocation as the default 
+      queue name, in the event that a queue name is not specified. If this is set 
+      to "false" or unset, all jobs have a shared default queue, called "default".
+
+ * <<<yarn.scheduler.fair.preemption>>>
+
+    * Whether to use preemption. Note that preemption is experimental in the current
+      version.
+
+ * <<<yarn.scheduler.fair.sizebasedweight>>>
+  
+    * Whether to assign shares to individual apps based on their size, rather than
+      providing an equal share to all apps regardless of size.
+
+ * <<<yarn.scheduler.fair.assignmultiple>>>
+
+    * Whether to allow multiple container assignments in one heartbeat.
+
+Allocation file format
+
+  The allocation file must be in XML format. The format contains three types of
+  elements:
+
+ * <<Queue elements>>, which represent queues. Each may contain the following
+     properties:
+
+   * minResources: minimum amount of aggregate memory
+
+   * maxResources: maximum amount of aggregate memory
+
+   * maxRunningApps: limit the number of apps from the queue to run at once
+
+   * weight: to share the cluster non-proportionally with other queues
+
+   * schedulingMode: either "fifo" or "fair" depending on the in-queue scheduling
+     policy desired
+
+ * <<User elements>>, which represent settings governing the behavior of individual 
+     users. They can contain a single property: maxRunningApps, a limit on the 
+     number of running apps for a particular user.
+
+ * <<A userMaxAppsDefault element>>, which sets the default running app limit 
+   for any users whose limit is not otherwise specified.
+
+  An example allocation file is given here:
+
+---
+<?xml version="1.0"?>
+<allocations>
+  <queue name="sample_queue">
+    <minResources>100000</minResources>
+    <maxResources>900000</maxResources>
+    <maxRunningApps>50</maxRunningApps>
+    <weight>2.0</weight>
+    <schedulingMode>fair</schedulingMode>
+  </queue>
+  <user name="sample_user">
+    <maxRunningApps>30</maxRunningApps>
+  </user>
+  <userMaxAppsDefault>5</userMaxAppsDefault>
+</allocations>
+---
+
+  Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.