You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [23/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Aug 19 23:49:39 2014
@@ -18,13 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileWriter;
@@ -34,6 +38,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -42,8 +47,6 @@ import java.util.Set;
import javax.xml.parsers.ParserConfigurationException;
-import org.junit.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -64,23 +68,24 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -88,58 +93,28 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.xml.sax.SAXException;
import com.google.common.collect.Sets;
-public class TestFairScheduler {
-
- static class MockClock implements Clock {
- private long time = 0;
- @Override
- public long getTime() {
- return time;
- }
-
- public void tick(int seconds) {
- time = time + seconds * 1000;
- }
-
- }
-
- final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "/tmp")).getAbsolutePath();
-
- final static String ALLOC_FILE = new File(TEST_DIR,
- "test-queues").getAbsolutePath();
+@SuppressWarnings("unchecked")
+public class TestFairScheduler extends FairSchedulerTestBase {
+ private final static String ALLOC_FILE =
+ new File(TEST_DIR, "test-queues").getAbsolutePath();
- private FairScheduler scheduler;
- private ResourceManager resourceManager;
- private Configuration conf;
- private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
- private int APP_ID = 1; // Incrementing counter for schedling apps
- private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
-
- // HELPER METHODS
@Before
public void setUp() throws IOException {
scheduler = new FairScheduler();
conf = createConfiguration();
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
- conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
- 1024);
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
- // All tests assume only one assignment per node update
- conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new ResourceManager();
resourceManager.init(conf);
@@ -150,6 +125,8 @@ public class TestFairScheduler {
// to initialize the master key
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+
+ scheduler.setRMContext(resourceManager.getRMContext());
}
@After
@@ -163,12 +140,12 @@ public class TestFairScheduler {
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
- ResourceScheduler scheduler = new FairScheduler();
+ FairScheduler scheduler = new FairScheduler();
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try {
- scheduler.reinitialize(conf, null);
+ scheduler.serviceInit(conf);
fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation.");
} catch (YarnRuntimeException e) {
@@ -182,7 +159,7 @@ public class TestFairScheduler {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
try {
- scheduler.reinitialize(conf, null);
+ scheduler.serviceInit(conf);
fail("Exception is expected because the min vcores allocation is" +
" larger than the max vcores allocation.");
} catch (YarnRuntimeException e) {
@@ -193,107 +170,6 @@ public class TestFairScheduler {
}
}
- private Configuration createConfiguration() {
- Configuration conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
- ResourceScheduler.class);
- return conf;
- }
-
- private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
- ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
- ApplicationAttemptId attId =
- ApplicationAttemptId.newInstance(appIdImpl, attemptId);
- return attId;
- }
-
- private ResourceRequest createResourceRequest(int memory, String host,
- int priority, int numContainers, boolean relaxLocality) {
- return createResourceRequest(memory, 1, host, priority, numContainers,
- relaxLocality);
- }
-
- private ResourceRequest createResourceRequest(int memory, int vcores, String host,
- int priority, int numContainers, boolean relaxLocality) {
- ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
- request.setCapability(BuilderUtils.newResource(memory, vcores));
- request.setResourceName(host);
- request.setNumContainers(numContainers);
- Priority prio = recordFactory.newRecordInstance(Priority.class);
- prio.setPriority(priority);
- request.setPriority(prio);
- request.setRelaxLocality(relaxLocality);
- return request;
- }
-
- /**
- * Creates a single container priority-1 request and submits to
- * scheduler.
- */
- private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
- String userId) {
- return createSchedulingRequest(memory, queueId, userId, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
- String queueId, String userId) {
- return createSchedulingRequest(memory, vcores, queueId, userId, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
- String userId, int numContainers) {
- return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
- String queueId, String userId, int numContainers) {
- return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
- String userId, int numContainers, int priority) {
- return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
- priority);
- }
-
- private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
- String queueId, String userId, int numContainers, int priority) {
- ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
- scheduler.addApplication(id.getApplicationId(), queueId, userId);
- // This conditional is for testAclSubmitApplication where app is rejected
- // and no app is added.
- if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
- scheduler.addApplicationAttempt(id, false);
- }
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
- priority, numContainers, true);
- ask.add(request);
- scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
- return id;
- }
-
- private void createSchedulingRequestExistingApplication(int memory, int priority,
- ApplicationAttemptId attId) {
- ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
- priority, 1, true);
- createSchedulingRequestExistingApplication(request, attId);
- }
-
- private void createSchedulingRequestExistingApplication(int memory, int vcores,
- int priority, ApplicationAttemptId attId) {
- ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
- priority, 1, true);
- createSchedulingRequestExistingApplication(request, attId);
- }
-
- private void createSchedulingRequestExistingApplication(ResourceRequest request,
- ApplicationAttemptId attId) {
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
- ask.add(request);
- scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null);
- }
-
// TESTS
@Test(timeout=2000)
@@ -315,6 +191,8 @@ public class TestFairScheduler {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
128);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertEquals(true, scheduler.assignMultiple);
Assert.assertEquals(3, scheduler.maxAssign);
@@ -342,6 +220,7 @@ public class TestFairScheduler {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
+ fs.init(conf);
fs.reinitialize(conf, null);
Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
@@ -359,8 +238,9 @@ public class TestFairScheduler {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
- fs.reinitialize(conf, null);
- Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
+ fs.init(conf);
+ fs.reinitialize(conf, null);
+ Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
@@ -368,6 +248,8 @@ public class TestFairScheduler {
@Test
public void testAggregateCapacityTracking() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -376,23 +258,25 @@ public class TestFairScheduler {
.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
- assertEquals(1024, scheduler.getClusterCapacity().getMemory());
+ assertEquals(1024, scheduler.getClusterResource().getMemory());
// Add another node
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
- assertEquals(1536, scheduler.getClusterCapacity().getMemory());
+ assertEquals(1536, scheduler.getClusterResource().getMemory());
// Remove the first node
NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(nodeEvent3);
- assertEquals(512, scheduler.getClusterCapacity().getMemory());
+ assertEquals(512, scheduler.getClusterResource().getMemory());
}
@Test
public void testSimpleFairShareCalculation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -405,6 +289,7 @@ public class TestFairScheduler {
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "queue1", "user1");
createSchedulingRequest(10 * 1024, "queue2", "user1");
+ createSchedulingRequest(10 * 1024, "root.default", "user1");
scheduler.update();
@@ -420,6 +305,8 @@ public class TestFairScheduler {
@Test
public void testSimpleHierarchicalFairShareCalculation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -433,6 +320,7 @@ public class TestFairScheduler {
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
+ createSchedulingRequest(10 * 1024, "root.default", "user1");
scheduler.update();
@@ -453,6 +341,8 @@ public class TestFairScheduler {
@Test
public void testHierarchicalQueuesSimilarParents() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -477,6 +367,8 @@ public class TestFairScheduler {
@Test
public void testSchedulerRootQueueMetrics() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -516,6 +408,8 @@ public class TestFairScheduler {
@Test (timeout = 5000)
public void testSimpleContainerAllocation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -564,6 +458,8 @@ public class TestFairScheduler {
@Test (timeout = 5000)
public void testSimpleContainerReservation() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -618,48 +514,27 @@ public class TestFairScheduler {
@Test
public void testUserAsDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
- RMContext rmContext = resourceManager.getRMContext();
- Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
- RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
- null, null, null, ApplicationSubmissionContext.newInstance(null, null,
- null, null, null, false, false, 0, null, null), null, null, 0, null, null);
- appsMap.put(appAttemptId.getApplicationId(), rmApp);
-
- AppAddedSchedulerEvent appAddedEvent =
- new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
- "user1");
- scheduler.handle(appAddedEvent);
- AppAttemptAddedSchedulerEvent attempAddedEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId, false);
- scheduler.handle(attempAddedEvent);
+ createApplicationWithAMResource(appAttemptId, "default", "user1", null);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
.getRunnableAppSchedulables().size());
- assertEquals("root.user1", rmApp.getQueue());
+ assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
+ .get(appAttemptId.getApplicationId()).getQueue());
}
@Test
public void testNotUserAsDefaultQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
- RMContext rmContext = resourceManager.getRMContext();
- Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps();
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
- RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf,
- null, null, null, ApplicationSubmissionContext.newInstance(null, null,
- null, null, null, false, false, 0, null, null), null, null, 0, null, null);
- appsMap.put(appAttemptId.getApplicationId(), rmApp);
-
- AppAddedSchedulerEvent appAddedEvent =
- new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
- "user2");
- scheduler.handle(appAddedEvent);
- AppAttemptAddedSchedulerEvent attempAddedEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId, false);
- scheduler.handle(attempAddedEvent);
+ createApplicationWithAMResource(appAttemptId, "default", "user2", null);
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
.getRunnableAppSchedulables().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -670,6 +545,8 @@ public class TestFairScheduler {
@Test
public void testEmptyQueueName() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// only default queue
@@ -690,8 +567,10 @@ public class TestFairScheduler {
@Test
public void testAssignToQueue() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
@@ -708,6 +587,8 @@ public class TestFairScheduler {
@Test
public void testAssignToNonLeafQueueReturnsNull() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
@@ -725,6 +606,8 @@ public class TestFairScheduler {
public void testQueuePlacementWithPolicy() throws Exception {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appId;
@@ -737,8 +620,11 @@ public class TestFairScheduler {
rules.add(new QueuePlacementRule.Default().initialize(true, null));
Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
+ Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
+ configuredQueues.put(FSQueueType.LEAF, queues);
+ configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
scheduler.getAllocationConfiguration().placementPolicy =
- new QueuePlacementPolicy(rules, queues, conf);
+ new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user1");
@@ -758,7 +644,7 @@ public class TestFairScheduler {
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null));
scheduler.getAllocationConfiguration().placementPolicy =
- new QueuePlacementPolicy(rules, queues, conf);
+ new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
@@ -782,7 +668,9 @@ public class TestFairScheduler {
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -809,23 +697,113 @@ public class TestFairScheduler {
}
}
}
+
+ @Test
+ public void testNestedUserQueue() throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"user1group\" type=\"parent\">");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("<queuePlacementPolicy>");
+ out.println("<rule name=\"specified\" create=\"false\" />");
+ out.println("<rule name=\"nestedUserQueue\">");
+ out.println(" <rule name=\"primaryGroup\" create=\"false\" />");
+ out.println("</rule>");
+ out.println("<rule name=\"default\" />");
+ out.println("</queuePlacementPolicy>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+ RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
+
+ FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
+ "user1");
+
+ assertEquals("root.user1group.user1", user1Leaf.getName());
+ }
+
+ @Test
+ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"parentq\" type=\"parent\">");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("<queuePlacementPolicy>");
+ out.println("<rule name=\"nestedUserQueue\">");
+ out.println(" <rule name=\"specified\" create=\"false\" />");
+ out.println("</rule>");
+ out.println("<rule name=\"default\" />");
+ out.println("</queuePlacementPolicy>");
+ out.println("</allocations>");
+ out.close();
+
+ RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
+ RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ int capacity = 16 * 1024;
+ // create node with 16 G
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+ // user1,user2 submit their apps to parentq and create user queues
+ createSchedulingRequest(10 * 1024, "root.parentq", "user1");
+ createSchedulingRequest(10 * 1024, "root.parentq", "user2");
+ // user3 submits app in default queue
+ createSchedulingRequest(10 * 1024, "root.default", "user3");
+
+ scheduler.update();
+
+ Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
+ .getLeafQueues();
+
+ for (FSLeafQueue leaf : leafQueues) {
+ if (leaf.getName().equals("root.parentq.user1")
+ || leaf.getName().equals("root.parentq.user2")) {
+ // assert that the fair share is 1/4th node1's capacity
+ assertEquals(capacity / 4, leaf.getFairShare().getMemory());
+ // assert weights are equal for both the user queues
+ assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
+ }
+ }
+ }
+
/**
* Make allocation requests and ensure they are reflected in queue demand.
*/
@Test
public void testQueueDemandCalculation() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
- scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
- scheduler.addApplicationAttempt(id11, false);
+ scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false);
+ scheduler.addApplicationAttempt(id11, false, false);
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
- scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
- scheduler.addApplicationAttempt(id21, false);
+ scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false);
+ scheduler.addApplicationAttempt(id21, false, false);
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
- scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
- scheduler.addApplicationAttempt(id22, false);
+ scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false);
+ scheduler.addApplicationAttempt(id22, false, false);
int minReqSize =
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
@@ -865,6 +843,8 @@ public class TestFairScheduler {
@Test
public void testAppAdditionAndRemoval() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId attemptId =createAppAttemptId(1, 1);
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
@@ -915,6 +895,8 @@ public class TestFairScheduler {
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -947,7 +929,9 @@ public class TestFairScheduler {
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -974,6 +958,8 @@ public class TestFairScheduler {
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
@@ -1031,8 +1017,10 @@ public class TestFairScheduler {
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
@@ -1075,13 +1063,13 @@ public class TestFairScheduler {
@Test (timeout = 5000)
/**
- * Make sure containers are chosen to be preempted in the correct order. Right
- * now this means decreasing order of priority.
+ * Make sure containers are chosen to be preempted in the correct order.
*/
public void testChoiceOfPreemptedContainers() throws Exception {
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
MockClock clock = new MockClock();
scheduler.setClock(clock);
@@ -1098,141 +1086,215 @@ public class TestFairScheduler {
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
- out.println("<queue name=\"queueD\">");
+ out.println("<queue name=\"default\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
- // Create four nodes
+ // Create two nodes
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
-
- // Queue A and B each request three containers
+ // Queue A and B each request two applications
ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+ createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
- ApplicationAttemptId app5 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
- ApplicationAttemptId app6 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+ createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
scheduler.update();
+ scheduler.getQueueManager().getLeafQueue("queueA", true)
+ .setPolicy(SchedulingPolicy.parse("fifo"));
+ scheduler.getQueueManager().getLeafQueue("queueB", true)
+ .setPolicy(SchedulingPolicy.parse("fair"));
+
// Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ for (int i = 0; i < 4; i++) {
scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
}
- assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size());
-
- // Now new requests arrive from queues C and D
- ApplicationAttemptId app7 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- ApplicationAttemptId app8 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- ApplicationAttemptId app9 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- ApplicationAttemptId app10 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
- ApplicationAttemptId app11 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
- ApplicationAttemptId app12 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
-
- scheduler.update();
-
- // We should be able to claw back one container from A and B each.
- // Make sure it is lowest priority container.
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
- assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
-
- // First verify we are adding containers to preemption list for the application
- assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(),
- scheduler.getSchedulerApp(app3).getPreemptionContainers()));
- assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(),
- scheduler.getSchedulerApp(app6).getPreemptionContainers()));
+ assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+ // Now new requests arrive from queueC and default
+ createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+ scheduler.update();
+
+ // We should be able to claw back one container from queueA and queueB each.
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+ // First verify we are adding containers to preemption list for the app.
+ // For queueA (fifo), app2 is selected.
+ // For queueB (fair), app4 is selected.
+ assertTrue("App2 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+ assertTrue("App4 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
// Pretend 15 seconds have passed
clock.tick(15);
// Trigger a kill by insisting we want containers back
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
// At this point the containers should have been killed (since we are not simulating AM)
- assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+ // Inside each app, containers are sorted according to their priorities.
+ // Containers with priority 4 are preempted for app2 and app4.
+ Set<RMContainer> set = new HashSet<RMContainer>();
+ for (RMContainer container :
+ scheduler.getSchedulerApp(app2).getLiveContainers()) {
+ if (container.getAllocatedPriority().getPriority() == 4) {
+ set.add(container);
+ }
+ }
+ for (RMContainer container :
+ scheduler.getSchedulerApp(app4).getLiveContainers()) {
+ if (container.getAllocatedPriority().getPriority() == 4) {
+ set.add(container);
+ }
+ }
+ assertTrue("Containers with priority=4 in app2 and app4 should be " +
+ "preempted.", set.isEmpty());
// Trigger a kill by insisting we want containers back
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
// Pretend 15 seconds have passed
clock.tick(15);
// We should be able to claw back another container from A and B each.
- // Make sure it is lowest priority container.
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
-
- assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ // For queueA (fifo), continue preempting from app2.
+ // For queueB (fair), even app4 has a lowest priority container with p=4, it
+ // still preempts from app3 as app3 is most over fair share.
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+ assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
// Now A and B are below fair share, so preemption shouldn't do anything
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
- assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ assertTrue("App1 should have no container to be preempted",
+ scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
+ assertTrue("App2 should have no container to be preempted",
+ scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
+ assertTrue("App3 should have no container to be preempted",
+ scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
+ assertTrue("App4 should have no container to be preempted",
+ scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
+ }
+
+ @Test
+ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
+ conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queueA\">");
+ out.println("<weight>8</weight>");
+ out.println("<queue name=\"queueA1\" />");
+ out.println("<queue name=\"queueA2\" />");
+ out.println("</queue>");
+ out.println("<queue name=\"queueB\">");
+ out.println("<weight>2</weight>");
+ out.println("</queue>");
+ out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node of 8G
+ RMNode node1 = MockNodes.newNodeInfo(1,
+ Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Run apps in queueA.A1 and queueB
+ ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
+ "queueA.queueA1", "user1", 7, 1);
+ // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
+ ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
+ "user2", 1, 1);
+
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ for (int i = 0; i < 8; i++) {
+ scheduler.handle(nodeUpdate1);
+ }
+
+ // verify if the apps got the containers they requested
+ assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+
+ // Now submit an app in queueA.queueA2
+ ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
+ "queueA.queueA2", "user3", 7, 1);
+ scheduler.update();
+
+ // Let 11 sec pass
+ clock.tick(11);
+
+ scheduler.update();
+ Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
+ .getLeafQueue("queueA.queueA2", false), clock.getTime());
+ assertEquals(3277, toPreempt.getMemory());
+
+ // verify if the 3 containers required by queueA2 are preempted in the same
+ // round
+ scheduler.preemptResources(toPreempt);
+ assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
+ .size());
}
@Test (timeout = 5000)
@@ -1271,6 +1333,8 @@ public class TestFairScheduler {
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create four nodes
@@ -1365,9 +1429,11 @@ public class TestFairScheduler {
assertEquals(
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
}
-
+
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -1410,8 +1476,10 @@ public class TestFairScheduler {
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// Add a node
RMNode node1 =
MockNodes
@@ -1451,6 +1519,8 @@ public class TestFairScheduler {
@Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
@@ -1466,7 +1536,7 @@ public class TestFairScheduler {
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
- FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId);
assertEquals(1, app.getLiveContainers().size());
ContainerId containerId = scheduler.getSchedulerApp(attId)
@@ -1531,21 +1601,25 @@ public class TestFairScheduler {
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"norealuserhasthisname", 1);
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"norealuserhasthisname2", 1);
- FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
assertNotNull("The application was not allowed", app1);
- FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
assertNull("The application was allowed", app2);
}
@Test (timeout = 5000)
public void testMultipleNodesSingleRackRequest() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@@ -1563,8 +1637,8 @@ public class TestFairScheduler {
scheduler.handle(nodeEvent2);
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
- scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
- scheduler.addApplicationAttempt(appId, false);
+ scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", false);
+ scheduler.addApplicationAttempt(appId, false, false);
// 1 request with 2 nodes on the same rack. another request with 1 node on
// a different rack
@@ -1595,6 +1669,8 @@ public class TestFairScheduler {
@Test (timeout = 5000)
public void testFifoWithinQueue() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@@ -1609,8 +1685,8 @@ public class TestFairScheduler {
"user1", 2);
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"user1", 2);
- FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
- FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
queue1.setPolicy(new FifoPolicy());
@@ -1639,6 +1715,8 @@ public class TestFairScheduler {
@Test(timeout = 3000)
public void testMaxAssign() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node =
@@ -1650,7 +1728,42 @@ public class TestFairScheduler {
ApplicationAttemptId attId =
createSchedulingRequest(1024, "root.default", "user", 8);
- FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId);
+
+ // set maxAssign to 2: only 2 containers should be allocated
+ scheduler.maxAssign = 2;
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Incorrect number of containers allocated", 2, app
+ .getLiveContainers().size());
+
+ // set maxAssign to -1: all remaining containers should be allocated
+ scheduler.maxAssign = -1;
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Incorrect number of containers allocated", 8, app
+ .getLiveContainers().size());
+ }
+
+ @Test(timeout = 3000)
+ public void testMaxAssignWithZeroMemoryContainers() throws Exception {
+ conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ RMNode node =
+ MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId attId =
+ createSchedulingRequest(0, 1, "root.default", "user", 8);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId);
// set maxAssign to 2: only 2 containers should be allocated
scheduler.maxAssign = 2;
@@ -1682,6 +1795,8 @@ public class TestFairScheduler {
*/
@Test(timeout = 5000)
public void testAssignContainer() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
final String user = "user1";
@@ -1712,10 +1827,10 @@ public class TestFairScheduler {
ApplicationAttemptId attId4 =
createSchedulingRequest(1024, fifoQueue, user, 4);
- FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
- FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
- FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
- FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
+ FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
+ FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
scheduler.getQueueManager().getLeafQueue(fifoQueue, true)
.setPolicy(SchedulingPolicy.parse("fifo"));
@@ -1765,9 +1880,11 @@ public class TestFairScheduler {
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
int appId = this.APP_ID++;
String user = "usernotallow";
String queue = "queue1";
@@ -1802,7 +1919,7 @@ public class TestFairScheduler {
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
- scheduler.addApplication(attId.getApplicationId(), queue, user);
+ scheduler.addApplication(attId.getApplicationId(), queue, user, false);
numTries = 0;
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
@@ -1816,6 +1933,8 @@ public class TestFairScheduler {
@Test
public void testReservationThatDoesntFit() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 =
@@ -1830,7 +1949,7 @@ public class TestFairScheduler {
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
- FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
@@ -1844,6 +1963,8 @@ public class TestFairScheduler {
@Test
public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
@@ -1872,6 +1993,8 @@ public class TestFairScheduler {
@Test
public void testStrictLocality() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1899,7 +2022,7 @@ public class TestFairScheduler {
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
- FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@@ -1912,6 +2035,8 @@ public class TestFairScheduler {
@Test
public void testCancelStrictLocality() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1938,7 +2063,7 @@ public class TestFairScheduler {
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
- FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@@ -1962,6 +2087,8 @@ public class TestFairScheduler {
*/
@Test
public void testReservationsStrictLocality() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
@@ -1971,7 +2098,7 @@ public class TestFairScheduler {
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);
- FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId);
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
@@ -2002,6 +2129,8 @@ public class TestFairScheduler {
@Test
public void testNoMoreCpuOnNode() throws IOException {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
@@ -2011,7 +2140,7 @@ public class TestFairScheduler {
ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
"user1", 2);
- FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
@@ -2023,6 +2152,8 @@ public class TestFairScheduler {
@Test
public void testBasicDRFAssignment() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
@@ -2031,13 +2162,13 @@ public class TestFairScheduler {
ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
"user1", 2);
- FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
"user1", 2);
- FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
- drfPolicy.initialize(scheduler.getClusterCapacity());
+ drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
@@ -2063,6 +2194,8 @@ public class TestFairScheduler {
*/
@Test
public void testBasicDRFWithQueues() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
@@ -2072,16 +2205,16 @@ public class TestFairScheduler {
ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
"user1", 2);
- FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
"user1", 2);
- FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
- FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
+ FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
- drfPolicy.initialize(scheduler.getClusterCapacity());
+ drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
@@ -2099,6 +2232,8 @@ public class TestFairScheduler {
@Test
public void testDRFHierarchicalQueues() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
@@ -2109,22 +2244,22 @@ public class TestFairScheduler {
ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
- FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
- FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
- FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
+ FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
- FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4);
+ FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
- drfPolicy.initialize(scheduler.getClusterCapacity());
+ drfPolicy.initialize(scheduler.getClusterResource());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
@@ -2167,7 +2302,9 @@ public class TestFairScheduler {
public void testHostPortNodeName() throws Exception {
conf.setBoolean(YarnConfiguration
.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
- scheduler.reinitialize(conf,
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf,
resourceManager.getRMContext());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
1, "127.0.0.1", 1);
@@ -2201,7 +2338,7 @@ public class TestFairScheduler {
NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
- FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@@ -2213,14 +2350,14 @@ public class TestFairScheduler {
}
private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
- FSSchedulerApp app = scheduler.getSchedulerApp(attId);
+ FSAppAttempt app = scheduler.getSchedulerApp(attId);
FSLeafQueue queue = app.getQueue();
- Collection<AppSchedulable> runnableApps =
+ Collection<FSAppAttempt> runnableApps =
queue.getRunnableAppSchedulables();
- Collection<AppSchedulable> nonRunnableApps =
+ Collection<FSAppAttempt> nonRunnableApps =
queue.getNonRunnableAppSchedulables();
- assertEquals(runnable, runnableApps.contains(app.getAppSchedulable()));
- assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable()));
+ assertEquals(runnable, runnableApps.contains(app));
+ assertEquals(!runnable, nonRunnableApps.contains(app));
}
private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue,
@@ -2247,9 +2384,11 @@ public class TestFairScheduler {
out.println("</user>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
verifyAppRunnable(attId1, true);
@@ -2283,6 +2422,284 @@ public class TestFairScheduler {
}
@Test
+ public void testQueueMaxAMShare() throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queue1\">");
+ out.println("<maxAMShare>0.2</maxAMShare>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ RMNode node =
+ MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
+ 0, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+ scheduler.update();
+
+ FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
+ assertEquals("Queue queue1's fair share should be 0", 0, queue1
+ .getFairShare().getMemory());
+
+ createSchedulingRequest(1 * 1024, "root.default", "user1");
+ scheduler.update();
+ scheduler.handle(updateEvent);
+
+ Resource amResource1 = Resource.newInstance(1024, 1);
+ Resource amResource2 = Resource.newInstance(2048, 2);
+ Resource amResource3 = Resource.newInstance(1860, 2);
+ int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
+ // Exceeds no limits
+ ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+ createApplicationWithAMResource(attId1, "queue1", "user1", amResource1);
+ createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application1's AM requests 1024 MB memory",
+ 1024, app1.getAMResource().getMemory());
+ assertEquals("Application1's AM should be running",
+ 1, app1.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 1024 MB memory",
+ 1024, queue1.getAmResourceUsage().getMemory());
+
+ // Exceeds no limits
+ ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
+ createApplicationWithAMResource(attId2, "queue1", "user1", amResource1);
+ createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application2's AM requests 1024 MB memory",
+ 1024, app2.getAMResource().getMemory());
+ assertEquals("Application2's AM should be running",
+ 1, app2.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Exceeds queue limit
+ ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
+ createApplicationWithAMResource(attId3, "queue1", "user1", amResource1);
+ createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3);
+ FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application3's AM requests 1024 MB memory",
+ 1024, app3.getAMResource().getMemory());
+ assertEquals("Application3's AM should not be running",
+ 0, app3.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Still can run non-AM container
+ createSchedulingRequestExistingApplication(1024, 1, attId1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application1 should have two running containers",
+ 2, app1.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Remove app1, app3's AM should become running
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
+ new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false);
+ scheduler.update();
+ scheduler.handle(appRemovedEvent1);
+ scheduler.handle(updateEvent);
+ assertEquals("Application1's AM should be finished",
+ 0, app1.getLiveContainers().size());
+ assertEquals("Application3's AM should be running",
+ 1, app3.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Exceeds queue limit
+ ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
+ createApplicationWithAMResource(attId4, "queue1", "user1", amResource2);
+ createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4);
+ FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application4's AM requests 2048 MB memory",
+ 2048, app4.getAMResource().getMemory());
+ assertEquals("Application4's AM should not be running",
+ 0, app4.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Exceeds queue limit
+ ApplicationAttemptId attId5 = createAppAttemptId(5, 1);
+ createApplicationWithAMResource(attId5, "queue1", "user1", amResource2);
+ createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5);
+ FSAppAttempt app5 = scheduler.getSchedulerApp(attId5);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application5's AM requests 2048 MB memory",
+ 2048, app5.getAMResource().getMemory());
+ assertEquals("Application5's AM should not be running",
+ 0, app5.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Remove un-running app doesn't affect others
+ AppAttemptRemovedSchedulerEvent appRemovedEvent4 =
+ new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.KILLED, false);
+ scheduler.handle(appRemovedEvent4);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application5's AM should not be running",
+ 0, app5.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Remove app2 and app3, app5's AM should become running
+ AppAttemptRemovedSchedulerEvent appRemovedEvent2 =
+ new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false);
+ AppAttemptRemovedSchedulerEvent appRemovedEvent3 =
+ new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false);
+ scheduler.handle(appRemovedEvent2);
+ scheduler.handle(appRemovedEvent3);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application2's AM should be finished",
+ 0, app2.getLiveContainers().size());
+ assertEquals("Application3's AM should be finished",
+ 0, app3.getLiveContainers().size());
+ assertEquals("Application5's AM should be running",
+ 1, app5.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Check amResource normalization
+ ApplicationAttemptId attId6 = createAppAttemptId(6, 1);
+ createApplicationWithAMResource(attId6, "queue1", "user1", amResource3);
+ createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6);
+ FSAppAttempt app6 = scheduler.getSchedulerApp(attId6);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application6's AM should not be running",
+ 0, app6.getLiveContainers().size());
+ assertEquals("Application6's AM requests 2048 MB memory",
+ 2048, app6.getAMResource().getMemory());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Remove all apps
+ AppAttemptRemovedSchedulerEvent appRemovedEvent5 =
+ new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.FINISHED, false);
+ AppAttemptRemovedSchedulerEvent appRemovedEvent6 =
+ new AppAttemptRemovedSchedulerEvent(attId6, RMAppAttemptState.FINISHED, false);
+ scheduler.handle(appRemovedEvent5);
+ scheduler.handle(appRemovedEvent6);
+ scheduler.update();
+ assertEquals("Queue1's AM resource usage should be 0",
+ 0, queue1.getAmResourceUsage().getMemory());
+ }
+
+ @Test
+ public void testQueueMaxAMShareDefault() throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queue1\">");
+ out.println("</queue>");
+ out.println("<queue name=\"queue2\">");
+ out.println("<maxAMShare>1.0</maxAMShare>");
+ out.println("</queue>");
+ out.println("<queue name=\"queue3\">");
+ out.println("</queue>");
+ out.println("<queue name=\"queue4\">");
+ out.println("</queue>");
+ out.println("<queue name=\"queue5\">");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ RMNode node =
+ MockNodes.newNodeInfo(1, Resources.createResource(8192, 20),
+ 0, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+ scheduler.update();
+
+ FSLeafQueue queue1 =
+ scheduler.getQueueManager().getLeafQueue("queue1", true);
+ assertEquals("Queue queue1's fair share should be 0", 0, queue1
+ .getFairShare().getMemory());
+ FSLeafQueue queue2 =
+ scheduler.getQueueManager().getLeafQueue("queue2", true);
+ assertEquals("Queue queue2's fair share should be 0", 0, queue2
+ .getFairShare().getMemory());
+ FSLeafQueue queue3 =
+ scheduler.getQueueManager().getLeafQueue("queue3", true);
+ assertEquals("Queue queue3's fair share should be 0", 0, queue3
+ .getFairShare().getMemory());
+ FSLeafQueue queue4 =
+ scheduler.getQueueManager().getLeafQueue("queue4", true);
+ assertEquals("Queue queue4's fair share should be 0", 0, queue4
+ .getFairShare().getMemory());
+ FSLeafQueue queue5 =
+ scheduler.getQueueManager().getLeafQueue("queue5", true);
+ assertEquals("Queue queue5's fair share should be 0", 0, queue5
+ .getFairShare().getMemory());
+
+ List<String> queues = Arrays.asList("root.default", "root.queue3",
+ "root.queue4", "root.queue5");
+ for (String queue : queues) {
+ createSchedulingRequest(1 * 1024, queue, "user1");
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ }
+
+ Resource amResource1 = Resource.newInstance(2048, 1);
+ int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
+
+ // Exceeds queue limit, but default maxAMShare is -1.0 so it doesn't matter
+ ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+ createApplicationWithAMResource(attId1, "queue1", "test1", amResource1);
+ createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1);
+ FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application1's AM requests 2048 MB memory",
+ 2048, app1.getAMResource().getMemory());
+ assertEquals("Application1's AM should be running",
+ 1, app1.getLiveContainers().size());
+ assertEquals("Queue1's AM resource usage should be 2048 MB memory",
+ 2048, queue1.getAmResourceUsage().getMemory());
+
+ // Exceeds queue limit, and maxAMShare is 1.0
+ ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
+ createApplicationWithAMResource(attId2, "queue2", "test1", amResource1);
+ createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2);
+ FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Application2's AM requests 2048 MB memory",
+ 2048, app2.getAMResource().getMemory());
+ assertEquals("Application2's AM should not be running",
+ 0, app2.getLiveContainers().size());
+ assertEquals("Queue2's AM resource usage should be 0 MB memory",
+ 0, queue2.getAmResourceUsage().getMemory());
+ }
+
+ @Test
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
@@ -2301,9 +2718,11 @@ public class TestFairScheduler {
out.println("</queue>");
out.println("</allocations>");
out.close();
-
+
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
verifyAppRunnable(attId1, true);
@@ -2363,6 +2782,9 @@ public class TestFairScheduler {
Configuration conf = createConfiguration();
conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
true);
+ fs.setRMContext(resourceManager.getRMContext());
+ fs.init(conf);
+ fs.start();
fs.reinitialize(conf, resourceManager.getRMContext());
Assert.assertTrue("Continuous scheduling should be enabled.",
fs.isContinuousSchedulingEnabled());
@@ -2380,14 +2802,14 @@ public class TestFairScheduler {
fs.handle(nodeEvent2);
// available resource
- Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
- Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
+ Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024);
+ Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16);
// send application request
ApplicationAttemptId appAttemptId =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
- fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
- fs.addApplicationAttempt(appAttemptId, false);
+ fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
+ fs.addApplicationAttempt(appAttemptId, false, false);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request =
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
@@ -2398,7 +2820,7 @@ public class TestFairScheduler {
// at least one pass
Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
- FSSchedulerApp app = fs.getSchedulerApp(appAttemptId);
+ FSAppAttempt app = fs.getSchedulerApp(appAttemptId);
// Wait until app gets resources.
while (app.getCurrentConsumption().equals(Resources.none())) { }
@@ -2429,7 +2851,43 @@ public class TestFairScheduler {
Assert.assertEquals(2, nodes.size());
}
-
+ @Test
+ public void testContinuousSchedulingWithNodeRemoved() throws Exception {
+ // Disable continuous scheduling, will invoke continuous scheduling once manually
+ scheduler.init(conf);
+ scheduler.start();
+ Assert.assertTrue("Continuous scheduling should be disabled.",
+ !scheduler.isContinuousSchedulingEnabled());
+
+ // Add two nodes
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+ Assert.assertEquals("We should have two alive nodes.",
+ 2, scheduler.getNumClusterNodes());
+
+ // Remove one node
+ NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
+ scheduler.handle(removeNode1);
+ Assert.assertEquals("We should only have one alive node.",
+ 1, scheduler.getNumClusterNodes());
+
+ // Invoke the continuous scheduling once
+ try {
+ scheduler.continuousSchedulingAttempt();
+ } catch (Exception e) {
+ fail("Exception happened when doing continuous scheduling. " +
+ e.toString());
+ }
+ }
+
@Test
public void testDontAllowUndeclaredPools() throws Exception{
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
@@ -2443,6 +2901,8 @@ public class TestFairScheduler {
out.println("</allocations>");
out.close();
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueManager = scheduler.getQueueManager();
@@ -2470,9 +2930,123 @@ public class TestFairScheduler {
assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
}
+ @Test
+ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured()
+ throws IOException {
+ // This test verifies if default rule in queue placement policy
+ // initializes properly when policy is not configured and
+ // undeclared pools is not allowed.
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
+
+ // Create an alloc file with no queue placement policy
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ List<QueuePlacementRule> rules = scheduler.allocConf.placementPolicy
+ .getRules();
+
+ for (QueuePlacementRule rule : rules) {
+ if (rule instanceof Default) {
+ Default defaultRule = (Default) rule;
+ assertNotNull(defaultRule.defaultQueueName);
+ }
+ }
+ }
+
+ @Test(timeout=5000)
+ public void testRecoverRequestAfterPreemption() throws Exception {
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ Priority priority = Priority.newInstance(20);
+ String host = "127.0.0.1";
+ int GB = 1024;
+
+ // Create Node and raised Node Added event
+ RMNode node = MockNodes.newNodeInfo(1,
+ Resources.createResource(16 * 1024, 4), 0, host);
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ // Create 3 container requests and place it in ask
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+ ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+ priority.getPriority(), 1, true);
+ ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+ node.getRackName(), priority.getPriority(), 1, true);
+ ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+ ResourceRequest.ANY, priority.getPriority(), 1, true);
+ ask.add(nodeLocalRequest);
+ ask.add(rackLocalRequest);
+ ask.add(offRackRequest);
+
+ // Create Request and update
+ ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+ "user1", ask);
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeUpdate);
+
+ assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+ .size());
+ FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
+
+ // ResourceRequest will be empty once NodeUpdate is completed
+ Assert.assertNull(app.getResourceRequest(priority, host));
+
+ ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1);
+ RMContainer rmContainer = app.getRMContainer(containerId1);
+
+ // Create a preempt event and register for preemption
+ scheduler.warnOrKillContainer(rmContainer);
+
+ // Wait for few clock ticks
+ clock.tick(5);
+
+ // preempt now
+ scheduler.warnOrKillContainer(rmContainer);
+
+ List<ResourceRequest> requests = rmContainer.getResourceRequests();
+ // Once recovered, resource request will be present again in app
+ Assert.assertEquals(3, requests.size());
+ for (ResourceRequest request : requests) {
+ Assert.assertEquals(1,
+ app.getResourceRequest(priority, request.getResourceName())
+ .getNumContainers());
+ }
+
+ // Send node heartbeat
+ scheduler.update();
+ scheduler.handle(nodeUpdate);
+
+ List<Container> containers = scheduler.allocate(appAttemptId,
+ Collections.<ResourceRequest> emptyList(),
+ Collections.<ContainerId> emptyList(), null, null).getContainers();
+
+ // Now with updated ResourceRequest, a container is allocated for AM.
+ Assert.assertTrue(containers.size() == 1);
+ }
+
@SuppressWarnings("resource")
@Test
public void testBlacklistNodes() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
final int GB = 1024;
@@ -2486,7 +3060,7 @@ public class TestFairScheduler {
ApplicationAttemptId appAttemptId =
createSchedulingRequest(GB, "root.default", "user", 1);
- FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
+ FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
// Verify the blacklist can be updated independent of requesting containers
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
@@ -2525,6 +3099,8 @@ public class TestFairScheduler {
@Test
public void testGetAppsInQueue() throws Exception {
+ scheduler.init(conf);
+ scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appAttId1 =
@@ -2561,16 +3137,19 @@ public class TestFairScheduler {
@Test
[... 222 lines stripped ...]