You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/07/13 02:43:02 UTC
svn commit: r1361020 [3/3] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/
hadoop-yarn/hadoop-yarn-server/had...
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,992 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFairScheduler {
+
+ private class MockClock implements Clock {
+ private long time = 0;
+ @Override
+ public long getTime() {
+ return time;
+ }
+
+ public void tick(int seconds) {
+ time = time + seconds * 1000;
+ }
+
+ }
+
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "/tmp")).getAbsolutePath();
+
+ final static String ALLOC_FILE = new File(TEST_DIR,
+ "test-queues").getAbsolutePath();
+
+ private FairScheduler scheduler;
+ private ResourceManager resourceManager;
+ private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ private int APP_ID = 1; // Incrementing counter for schedling apps
+ private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
+
+ // HELPER METHODS
+ @Before
+ public void setUp() throws IOException {
+ scheduler = new FairScheduler();
+ Configuration conf = new Configuration();
+ // All tests assume only one assignment per node update
+ conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
+ Store store = StoreFactory.getStore(conf);
+ resourceManager = new ResourceManager(store);
+ resourceManager.init(conf);
+ ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+ }
+
+ @After
+ public void tearDown() {
+ scheduler = null;
+ resourceManager = null;
+ }
+
+ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
+ ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
+ appIdImpl.setId(appId);
+ attId.setAttemptId(attemptId);
+ attId.setApplicationId(appIdImpl);
+ return attId;
+ }
+
+
+ private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers) {
+ ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
+ request.setCapability(Resources.createResource(memory));
+ request.setHostName(host);
+ request.setNumContainers(numContainers);
+ Priority prio = recordFactory.newRecordInstance(Priority.class);
+ prio.setPriority(priority);
+ request.setPriority(prio);
+ return request;
+ }
+
+ /**
+ * Creates a single container priority-1 request and submits to
+ * scheduler.
+ */
+ private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId) {
+ return createSchedulingRequest(memory, queueId, userId, 1);
+ }
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers) {
+ return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
+ }
+
+ private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers, int priority) {
+ ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+ scheduler.addApplication(id, queueId, userId);
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+ ResourceRequest request = createResourceRequest(memory, "*", priority, numContainers);
+ ask.add(request);
+ scheduler.allocate(id, ask, new ArrayList<ContainerId>());
+ return id;
+ }
+
+ // TESTS
+
+ @Test
+ public void testAggregateCapacityTracking() throws Exception {
+ // Add a node
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+ assertEquals(1024, scheduler.getClusterCapacity().getMemory());
+
+ // Add another node
+ RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+ assertEquals(1536, scheduler.getClusterCapacity().getMemory());
+
+ // Remove the first node
+ NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent3);
+ assertEquals(512, scheduler.getClusterCapacity().getMemory());
+ }
+
+ @Test
+ public void testSimpleFairShareCalculation() {
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Have two queues which want entire cluster capacity
+ createSchedulingRequest(10 * 1024, "queue1", "user1");
+ createSchedulingRequest(10 * 1024, "queue2", "user1");
+
+ scheduler.update();
+
+ Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+ assertEquals(3, queues.size());
+
+ for (FSQueue p : queues) {
+ if (p.getName() != "default") {
+ assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory());
+ }
+ }
+ }
+
+ @Test
+ public void testSimpleContainerAllocation() {
+ // Add a node
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Add another node
+ RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ createSchedulingRequest(512, "queue1", "user1", 2);
+
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
+ new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ scheduler.handle(updateEvent);
+
+ assertEquals(512, scheduler.getQueueManager().getQueue("queue1").
+ getQueueSchedulable().getResourceUsage().getMemory());
+
+ NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
+ new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ scheduler.handle(updateEvent2);
+
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
+ getQueueSchedulable().getResourceUsage().getMemory());
+ }
+
+ @Test
+ public void testSimpleContainerReservation() throws InterruptedException {
+ // Add a node
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue 1 requests full capacity of node
+ createSchedulingRequest(1024, "queue1", "user1", 1);
+ scheduler.update();
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
+ new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ scheduler.handle(updateEvent);
+
+ // Make sure queue 1 is allocated app capacity
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
+ getQueueSchedulable().getResourceUsage().getMemory());
+
+ // Now queue 2 requests likewise
+ ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ // Make sure queue 2 is waiting with a reservation
+ assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+ getQueueSchedulable().getResourceUsage().getMemory());
+ assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
+
+ // Now another node checks in with capacity
+ RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
+ new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
+ scheduler.handle(nodeEvent2);
+ scheduler.handle(updateEvent2);
+
+ // Make sure this goes to queue 2
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+ getQueueSchedulable().getResourceUsage().getMemory());
+
+ // The old reservation should still be there...
+ assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
+ // ... but it should disappear when we update the first node.
+ scheduler.handle(updateEvent);
+ assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory());
+
+ }
+
+ @Test
+ public void testUserAsDefaultQueue() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+ AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
+ createAppAttemptId(1, 1), "default", "user1");
+ scheduler.handle(appAddedEvent);
+ assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
+ assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size());
+
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+ AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
+ createAppAttemptId(2, 1), "default", "user2");
+ scheduler.handle(appAddedEvent2);
+ assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
+ assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size());
+ assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size());
+ }
+
+ @Test
+ public void testFairShareWithMinAlloc() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<minResources>1024</minResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<minResources>2048</minResources>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ createSchedulingRequest(2 * 1024, "queueA", "user1");
+ createSchedulingRequest(2 * 1024, "queueB", "user1");
+
+ scheduler.update();
+
+ Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+ assertEquals(3, queues.size());
+
+ for (FSQueue p : queues) {
+ if (p.getName().equals("queueA")) {
+ assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory());
+ }
+ else if (p.getName().equals("queueB")) {
+ assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory());
+ }
+ }
+
+ }
+
+ /**
+ * Make allocation requests and ensure they are reflected in queue demand.
+ */
+ @Test
+ public void testQueueDemandCalculation() throws Exception {
+ ApplicationAttemptId id11 = createAppAttemptId(1, 1);
+ scheduler.addApplication(id11, "queue1", "user1");
+ ApplicationAttemptId id21 = createAppAttemptId(2, 1);
+ scheduler.addApplication(id21, "queue2", "user1");
+ ApplicationAttemptId id22 = createAppAttemptId(2, 2);
+ scheduler.addApplication(id22, "queue2", "user1");
+
+ // First ask, queue1 requests 1024
+ List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
+ ResourceRequest request1 = createResourceRequest(1024, "*", 1, 1);
+ ask1.add(request1);
+ scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
+
+ // Second ask, queue2 requests 1024 + (2 * 512)
+ List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
+ ResourceRequest request2 = createResourceRequest(1024, "foo", 1, 1);
+ ResourceRequest request3 = createResourceRequest(512, "bar", 1, 2);
+ ask2.add(request2);
+ ask2.add(request3);
+ scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
+
+ // Third ask, queue2 requests 1024
+ List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
+ ResourceRequest request4 = createResourceRequest(1024, "*", 1, 1);
+ ask3.add(request4);
+ scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
+
+ scheduler.update();
+
+ assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").getQueueSchedulable().getDemand().getMemory());
+ assertEquals(1024 + 1024 + (2 * 512), scheduler.getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand().getMemory());
+
+ }
+
+ @Test
+ public void testAppAdditionAndRemoval() throws Exception {
+ AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
+ createAppAttemptId(1, 1), "default", "user1");
+ scheduler.handle(appAddedEvent1);
+
+ // Scheduler should have one queue (the default)
+ assertEquals(1, scheduler.getQueueManager().getQueues().size());
+
+ // That queue should have one app
+ assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size());
+
+ AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
+ createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
+
+ // Now remove app
+ scheduler.handle(appRemovedEvent1);
+
+ // Default queue should have no apps
+ assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size());
+ }
+
+ @Test
+ public void testAllocationFileParsing() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ // Give queue A a minimum of 1024 M
+ out.println("<queue name=\"queueA\">");
+ out.println("<minResources>1024</minResources>");
+ out.println("</queue>");
+ // Give queue B a minimum of 2048 M
+ out.println("<queue name=\"queueB\">");
+ out.println("<minResources>2048</minResources>");
+ out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
+ out.println("</queue>");
+ // Give queue C no minimum
+ out.println("<queue name=\"queueC\">");
+ out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
+ out.println("</queue>");
+ // Give queue D a limit of 3 running apps
+ out.println("<queue name=\"queueD\">");
+ out.println("<maxRunningApps>3</maxRunningApps>");
+ out.println("</queue>");
+ // Give queue E a preemption timeout of one minute
+ out.println("<queue name=\"queueE\">");
+ out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+ out.println("</queue>");
+ // Set default limit of apps per queue to 15
+ out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
+ // Set default limit of apps per user to 5
+ out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
+ // Give user1 a limit of 10 jobs
+ out.println("<user name=\"user1\">");
+ out.println("<maxRunningApps>10</maxRunningApps>");
+ out.println("</user>");
+ // Set default min share preemption timeout to 2 minutes
+ out.println("<defaultMinSharePreemptionTimeout>120"
+ + "</defaultMinSharePreemptionTimeout>");
+ // Set fair share preemption timeout to 5 minutes
+ out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+
+ assertEquals(Resources.createResource(1024),
+ queueManager.getMinResources("queueA"));
+ assertEquals(Resources.createResource(2048),
+ queueManager.getMinResources("queueB"));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources("queueC"));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources("queueD"));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources("queueE"));
+
+ assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
+ assertEquals(15, queueManager.getQueueMaxApps("queueA"));
+ assertEquals(15, queueManager.getQueueMaxApps("queueB"));
+ assertEquals(15, queueManager.getQueueMaxApps("queueC"));
+ assertEquals(3, queueManager.getQueueMaxApps("queueD"));
+ assertEquals(15, queueManager.getQueueMaxApps("queueE"));
+ assertEquals(10, queueManager.getUserMaxApps("user1"));
+ assertEquals(5, queueManager.getUserMaxApps("user2"));
+
+ // Unspecified queues should get default ACL
+ Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA");
+ assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
+ assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+ assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+ assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+ // Queue B ACL
+ Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
+ assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
+ assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+
+ // Queue c ACL
+ Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
+ assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+ assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+ assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
+ assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
+ }
+
+ @Test
+ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ // Give queue A a minimum of 1024 M
+ out.println("<pool name=\"queueA\">");
+ out.println("<minResources>1024</minResources>");
+ out.println("</pool>");
+ // Give queue B a minimum of 2048 M
+ out.println("<pool name=\"queueB\">");
+ out.println("<minResources>2048</minResources>");
+ out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
+ out.println("</pool>");
+ // Give queue C no minimum
+ out.println("<pool name=\"queueC\">");
+ out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
+ out.println("</pool>");
+ // Give queue D a limit of 3 running apps
+ out.println("<pool name=\"queueD\">");
+ out.println("<maxRunningApps>3</maxRunningApps>");
+ out.println("</pool>");
+ // Give queue E a preemption timeout of one minute
+ out.println("<pool name=\"queueE\">");
+ out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+ out.println("</pool>");
+ // Set default limit of apps per queue to 15
+ out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
+ // Set default limit of apps per user to 5
+ out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
+ // Give user1 a limit of 10 jobs
+ out.println("<user name=\"user1\">");
+ out.println("<maxRunningApps>10</maxRunningApps>");
+ out.println("</user>");
+ // Set default min share preemption timeout to 2 minutes
+ out.println("<defaultMinSharePreemptionTimeout>120"
+ + "</defaultMinSharePreemptionTimeout>");
+ // Set fair share preemption timeout to 5 minutes
+ out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
+
+ assertEquals(Resources.createResource(1024),
+ queueManager.getMinResources("queueA"));
+ assertEquals(Resources.createResource(2048),
+ queueManager.getMinResources("queueB"));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources("queueC"));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources("queueD"));
+ assertEquals(Resources.createResource(0),
+ queueManager.getMinResources("queueE"));
+
+ assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
+ assertEquals(15, queueManager.getQueueMaxApps("queueA"));
+ assertEquals(15, queueManager.getQueueMaxApps("queueB"));
+ assertEquals(15, queueManager.getQueueMaxApps("queueC"));
+ assertEquals(3, queueManager.getQueueMaxApps("queueD"));
+ assertEquals(15, queueManager.getQueueMaxApps("queueE"));
+ assertEquals(10, queueManager.getUserMaxApps("user1"));
+ assertEquals(5, queueManager.getUserMaxApps("user2"));
+
+ // Unspecified queues should get default ACL
+ Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA");
+ assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
+ assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+ assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+ assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+ // Queue B ACL
+ Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
+ assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
+ assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
+
+ // Queue c ACL
+ Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
+ assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
+ assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
+
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
+ assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
+ assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
+ assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
+ }
+
+ @Test
+ public void testIsStarvedForMinShare() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<minResources>2048</minResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<minResources>2048</minResources>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A wants 3 * 1024. Node update gives this all to A
+ createSchedulingRequest(3 * 1024, "queueA", "user1");
+ scheduler.update();
+ NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeEvent2);
+
+ // Queue B arrives and wants 1 * 1024
+ createSchedulingRequest(1 * 1024, "queueB", "user1");
+ scheduler.update();
+ Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+ assertEquals(3, queues.size());
+
+ // Queue A should be above min share, B below.
+ for (FSQueue p : queues) {
+ if (p.getName().equals("queueA")) {
+ assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
+ }
+ else if (p.getName().equals("queueB")) {
+ assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
+ }
+ }
+
+ // Node checks in again, should allocate for B
+ scheduler.handle(nodeEvent2);
+ // Now B should have min share ( = demand here)
+ for (FSQueue p : queues) {
+ if (p.getName().equals("queueB")) {
+ assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
+ }
+ }
+ }
+
+ @Test
+ public void testIsStarvedForFairShare() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>.25</weight>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>.75</weight>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ // Add one big node (only care about aggregate capacity)
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A wants 3 * 1024. Node update gives this all to A
+ createSchedulingRequest(3 * 1024, "queueA", "user1");
+ scheduler.update();
+ NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeEvent2);
+
+ // Queue B arrives and wants 1 * 1024
+ createSchedulingRequest(1 * 1024, "queueB", "user1");
+ scheduler.update();
+ Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
+ assertEquals(3, queues.size());
+
+ // Queue A should be above fair share, B below.
+ for (FSQueue p : queues) {
+ if (p.getName().equals("queueA")) {
+ assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
+ }
+ else if (p.getName().equals("queueB")) {
+ assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
+ }
+ }
+
+ // Node checks in again, should allocate for B
+ scheduler.handle(nodeEvent2);
+ // B should not be starved for fair share, since entire demand is
+ // satisfied.
+ for (FSQueue p : queues) {
+ if (p.getName().equals("queueB")) {
+ assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
+ }
+ }
+ }
+
+ @Test
+ /**
+ * Make sure containers are chosen to be preempted in the correct order. Right
+ * now this means decreasing order of priority.
+ */
+ public void testChoiceOfPreemptedContainers() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>.25</weight>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>.25</weight>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueC\">");
+ out.println("<weight>.25</weight>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueD\">");
+ out.println("<weight>.25</weight>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ // Create four nodes
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+
+ // Queue A and B each request three containers
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+ ApplicationAttemptId app5 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+ ApplicationAttemptId app6 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 2; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeUpdate3);
+ }
+
+ assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size());
+
+ // Now new requests arrive from queues C and D
+ ApplicationAttemptId app7 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+ ApplicationAttemptId app8 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+ ApplicationAttemptId app9 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+ ApplicationAttemptId app10 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
+ ApplicationAttemptId app11 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
+ ApplicationAttemptId app12 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
+
+ scheduler.update();
+
+ // We should be able to claw back one container from A and B each.
+ // Make sure it is lowest priority container.
+ scheduler.preemptResources(scheduler.getQueueSchedulables(),
+ Resources.createResource(2 * 1024));
+ assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+
+ // We should be able to claw back another container from A and B each.
+ // Make sure it is lowest priority container.
+ scheduler.preemptResources(scheduler.getQueueSchedulables(),
+ Resources.createResource(2 * 1024));
+ assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+
+ // Now A and B are below fair share, so preemption shouldn't do anything
+ scheduler.preemptResources(scheduler.getQueueSchedulables(),
+ Resources.createResource(2 * 1024));
+ assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+ assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+ }
+
+ @Test
+ /**
+ * Tests the timing of decision to preempt tasks.
+ */
+ public void testPreemptionDecision() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+ scheduler.reinitialize(conf, null, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024</minResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024</minResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueC\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024</minResources>");
+ out.println("</queue>");
+ out.println("<queue name=\"queueD\">");
+ out.println("<weight>.25</weight>");
+ out.println("<minResources>1024</minResources>");
+ out.println("</queue>");
+ out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+ out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ // Create four nodes
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+
+ // Queue A and B each request three containers
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+ ApplicationAttemptId app5 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+ ApplicationAttemptId app6 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 2; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
+ new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
+ scheduler.handle(nodeUpdate3);
+ }
+
+ // Now new requests arrive from queues C and D
+ ApplicationAttemptId app7 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+ ApplicationAttemptId app8 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+ ApplicationAttemptId app9 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+ ApplicationAttemptId app10 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
+ ApplicationAttemptId app11 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
+ ApplicationAttemptId app12 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
+
+ scheduler.update();
+
+ FSQueueSchedulable schedC =
+ scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable();
+ FSQueueSchedulable schedD =
+ scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable();
+
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resToPreempt(schedC, clock.getTime())));
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resToPreempt(schedD, clock.getTime())));
+ // After minSharePreemptionTime has passed, they should want to preempt min
+ // share.
+ clock.tick(6);
+ assertTrue(Resources.equals(
+ Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
+ assertTrue(Resources.equals(
+ Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
+
+ // After fairSharePreemptionTime has passed, they should want to preempt
+ // fair share.
+ scheduler.update();
+ clock.tick(6);
+ assertTrue(Resources.equals(
+ Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
+ assertTrue(Resources.equals(
+ Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Fri Jul 13 00:43:01 2012
@@ -0,0 +1,179 @@
+~~ Licensed under the Apache License, Version 2.0 (the "License");
+~~ you may not use this file except in compliance with the License.
+~~ You may obtain a copy of the License at
+~~
+~~ http://www.apache.org/licenses/LICENSE-2.0
+~~
+~~ Unless required by applicable law or agreed to in writing, software
+~~ distributed under the License is distributed on an "AS IS" BASIS,
+~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~~ See the License for the specific language governing permissions and
+~~ limitations under the License. See accompanying LICENSE file.
+
+ ---
+ Hadoop Map Reduce Next Generation-${project.version} - Fair Scheduler
+ ---
+ ---
+ ${maven.build.timestamp}
+
+Hadoop MapReduce Next Generation - Fair Scheduler
+
+ \[ {{{./index.html}Go Back}} \]
+
+%{toc|section=1|fromDepth=0}
+
+* {Purpose}
+
+ This document describes the <<<FairScheduler>>>, a pluggable scheduler for Hadoop
+ which provides a way to share large clusters. <<NOTE:>> The Fair Scheduler
+ implementation is currently under development and should be considered experimental.
+
+* {Introduction}
+
+ Fair scheduling is a method of assigning resources to applications such that
+ all apps get, on average, an equal share of resources over time.
+ Hadoop NextGen is capable of scheduling multiple resource types, such as
+ Memory and CPU. Currently only memory is supported, so a "cluster share" is
+ a proportion of aggregate memory in the cluster. When there is a single app
+ running, that app uses the entire cluster. When other apps are submitted,
+ resources that free up are assigned to the new apps, so that each app gets
+ roughly the same amount of resources. Unlike the default Hadoop scheduler,
+ which forms a queue of apps, this lets short apps finish in reasonable time
+ while not starving long-lived apps. It is also a reasonable way to share a
+ cluster between a number of users. Finally, fair sharing can also work with
+ app priorities - the priorities are used as weights to determine the
+ fraction of total resources that each app should get.
+
+ The scheduler organizes apps further into "queues", and shares resources
+ fairly between these queues. By default, all users share a single queue,
+ called âdefaultâ. If an app specifically lists a queue in a container
+ resource request, the request is submitted to that queue. It is also
+ possible to assign queues based on the user name included with the request
+ through configuration. Within each queue, fair sharing is used to share
+ capacity between the running apps. queues can also be given weights to share
+ the cluster non-proportionally in the config file.
+
+ In addition to providing fair sharing, the Fair Scheduler allows assigning
+ guaranteed minimum shares to queues, which is useful for ensuring that
+ certain users, groups or production applications always get sufficient
+ resources. When a queue contains apps, it gets at least its minimum share,
+ but when the queue does not need its full guaranteed share, the excess is
+ split between other running apps. This lets the scheduler guarantee capacity
+ for queues while utilizing resources efficiently when these queues don't
+ contain applications.
+
+ The Fair Scheduler lets all apps run by default, but it is also possible to
+ limit the number of running apps per user and per queue through the config
+ file. This can be useful when a user must submit hundreds of apps at once,
+ or in general to improve performance if running too many apps at once would
+ cause too much intermediate data to be created or too much context-switching.
+ Limiting the apps does not cause any subsequently submitted apps to fail,
+ only to wait in the scheduler's queue until some of the user's earlier apps
+ finish. apps to run from each user/queue are chosen in order of priority and
+ then submit time, as in the default FIFO scheduler in Hadoop.
+
+ Certain add-ons are not yet supported which existed in the original (MR1)
+ Fair Scheduler. Among them, is the use of a custom policies governing
+ priority âboostingâ over certain apps.
+
+* {Installation}
+
+ To use the Fair Scheduler first assign the appropriate scheduler class in
+ yarn-site.xml:
+
+------
+<property>
+ <name>yarn.resourcemanager.scheduler.class</name>
+ <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
+</property>
+------
+
+* {Configuration}
+
+ Customizing the Fair Scheduler typically involves altering two files. First,
+ scheduler-wide options can be set by adding configuration properties in the
+ fair-scheduler.xml file in your existing configuration directory. Second, in
+ most cases users will want to create a manifest file listing which queues
+ exist and their respective weights and capacities. The location of this file
+ is flexible - but it must be declared in fair-scheduler.xml.
+
+ * <<<yarn.scheduler.fair.allocation.file>>>
+
+ * Path to allocation file. An allocation file is an XML manifest describing
+ queues and their properties, in addition to certain policy defaults. This file
+ must be in XML format as described in the next section.
+
+ * <<<yarn.scheduler.fair.minimum-allocation-mb>>>
+
+ * The smallest container size the scheduler can allocate, in MB of memory.
+
+ * <<<yarn.scheduler.fair.minimum-allocation-mb>>>
+
+ * The largest container the scheduler can allocate, in MB of memory.
+
+ * <<<yarn.scheduler.fair.user-as-default-queue>>>
+
+ * Whether to use the username associated with the allocation as the default
+ queue name, in the event that a queue name is not specified. If this is set
+ to "false" or unset, all jobs have a shared default queue, called "default".
+
+ * <<<yarn.scheduler.fair.preemption>>>
+
+ * Whether to use preemption. Note that preemption is experimental in the current
+ version.
+
+ * <<<yarn.scheduler.fair.sizebasedweight>>>
+
+ * Whether to assign shares to individual apps based on their size, rather than
+ providing an equal share to all apps regardless of size.
+
+ * <<<yarn.scheduler.fair.assignmultiple>>>
+
+ * Whether to allow multiple container assignments in one heartbeat.
+
+Allocation file format
+
+ The allocation file must be in XML format. The format contains three types of
+ elements:
+
+ * <<Queue elements>>, which represent queues. Each may contain the following
+ properties:
+
+ * minResources: minimum amount of aggregate memory
+
+ * maxResources: maximum amount of aggregate memory
+
+ * maxRunningApps: limit the number of apps from the queue to run at once
+
+ * weight: to share the cluster non-proportionally with other queues
+
+ * schedulingMode: either "fifo" or "fair" depending on the in-queue scheduling
+ policy desired
+
+ * <<User elements>>, which represent settings governing the behavior of individual
+ users. They can contain a single property: maxRunningApps, a limit on the
+ number of running apps for a particular user.
+
+ * <<A userMaxAppsDefault element>>, which sets the default running app limit
+ for any users whose limit is not otherwise specified.
+
+ An example allocation file is given here:
+
+---
+<?xml version="1.0"?>
+<allocations>
+ <queue name="sample_queue">
+ <minResources>100000</minResources>
+ <maxResources>900000</maxResources>
+ <maxRunningApps>50</maxRunningApps>
+ <weight>2.0</weight>
+ <schedulingMode>fair</schedulingMode>
+ </queue>
+ <user name="sample_user">
+ <maxRunningApps>30</maxRunningApps>
+ </user>
+ <userMaxAppsDefault>5</userMaxAppsDefault>
+</allocations>
+---
+
+ Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.