You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/12/16 22:39:44 UTC
[hadoop] branch trunk updated: YARN-10963. Split TestCapacityScheduler by test categories. Contributed by Tamas Domok
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new aec9cdb YARN-10963. Split TestCapacityScheduler by test categories. Contributed by Tamas Domok
aec9cdb is described below
commit aec9cdb467d81f99ca74a4cf3e076d9e1abec728
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Thu Dec 16 23:39:18 2021 +0100
YARN-10963. Split TestCapacityScheduler by test categories. Contributed by Tamas Domok
---
.../CapacitySchedulerConfigGeneratorForTest.java | 52 +
.../capacity/CapacitySchedulerTestUtilities.java | 149 +
.../scheduler/capacity/TestCapacityScheduler.java | 2889 +-------------------
.../capacity/TestCapacitySchedulerApps.java | 1499 ++++++++++
.../capacity/TestCapacitySchedulerNodes.java | 387 +++
.../capacity/TestCapacitySchedulerQueues.java | 888 ++++++
6 files changed, 3046 insertions(+), 2818 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
index 873e3b9..087b797 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
@@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.HashMap;
import java.util.Map;
@@ -50,4 +54,52 @@ public final class CapacitySchedulerConfigGeneratorForTest {
return createConfiguration(conf);
}
+ public static void setMinAllocMb(Configuration conf, int minAllocMb) {
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ minAllocMb);
+ }
+
+ public static void setMaxAllocMb(Configuration conf, int maxAllocMb) {
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ maxAllocMb);
+ }
+
+ public static void setMaxAllocMb(CapacitySchedulerConfiguration conf,
+ String queueName, int maxAllocMb) {
+ String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ + MAXIMUM_ALLOCATION_MB;
+ conf.setInt(propName, maxAllocMb);
+ }
+
+ public static void setMinAllocVcores(Configuration conf, int minAllocVcores) {
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ minAllocVcores);
+ }
+
+ public static void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ maxAllocVcores);
+ }
+
+ public static void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
+ String queueName, int maxAllocVcores) {
+ String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+ conf.setInt(propName, maxAllocVcores);
+ }
+
+ public static void setMaxAllocation(CapacitySchedulerConfiguration conf,
+ String queueName, String maxAllocation) {
+ String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ + MAXIMUM_ALLOCATION;
+ conf.set(propName, maxAllocation);
+ }
+
+ public static void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
+ String queueName) {
+ String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ + MAXIMUM_ALLOCATION;
+ conf.unset(propName);
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java
index 3d098f8..b2c6548 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java
@@ -18,14 +18,48 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.util.Sets;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
+import java.io.IOException;
import java.util.Set;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public final class CapacitySchedulerTestUtilities {
public static final int GB = 1024;
@@ -69,4 +103,119 @@ public final class CapacitySchedulerTestUtilities {
}
}
}
+
+ public static ResourceManager createResourceManager() throws Exception {
+ ResourceUtils.resetResourceTypes(new Configuration());
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ ResourceManager resourceManager = new ResourceManager() {
+ @Override
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(getConfig());
+ return mgr;
+ }
+ };
+ CapacitySchedulerConfiguration csConf
+ = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER,
+ CapacityScheduler.class, ResourceScheduler.class);
+ resourceManager.init(conf);
+ resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+ resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
+ ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
+ return resourceManager;
+ }
+
+ public static RMContext createMockRMContext() {
+ RMContext mockContext = mock(RMContext.class);
+ when(mockContext.getConfigurationProvider()).thenReturn(
+ new LocalConfigurationProvider());
+ return mockContext;
+ }
+
+ public static void stopResourceManager(ResourceManager resourceManager) throws Exception {
+ if (resourceManager != null) {
+ QueueMetrics.clearQueueMetrics();
+ DefaultMetricsSystem.shutdown();
+ resourceManager.stop();
+ }
+ }
+
+ public static ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
+ int clusterTs, int appId, String queue,
+ String user) {
+ ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
+ ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
+ appId1, appId);
+
+ RMAppAttemptMetrics attemptMetric1 =
+ new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
+ RMAppImpl app1 = mock(RMAppImpl.class);
+ when(app1.getApplicationId()).thenReturn(appId1);
+ RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+ Container container = mock(Container.class);
+ when(attempt1.getMasterContainer()).thenReturn(container);
+ ApplicationSubmissionContext submissionContext = mock(
+ ApplicationSubmissionContext.class);
+ when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
+ when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
+ when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
+ when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
+ rm.getRMContext().getRMApps().put(appId1, app1);
+
+ SchedulerEvent addAppEvent1 =
+ new AppAddedSchedulerEvent(appId1, queue, user);
+ cs.handle(addAppEvent1);
+ SchedulerEvent addAttemptEvent1 =
+ new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
+ cs.handle(addAttemptEvent1);
+ return appAttemptId1;
+ }
+
+ public static MockRM setUpMove() {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ return setUpMove(conf);
+ }
+
+ public static MockRM setUpMove(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+ setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ return rm;
+ }
+
+ public static void nodeUpdate(ResourceManager rm, NodeManager nm) {
+ RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+ // Send a heartbeat to kick the tires on the Scheduler
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+ rm.getResourceScheduler().handle(nodeUpdate);
+ }
+
+ public static NodeManager registerNode(ResourceManager rm, String hostName,
+ int containerManagerPort, int httpPort, String rackName,
+ Resource capability, NodeStatus nodeStatus)
+ throws IOException, YarnException {
+ NodeManager nm = new NodeManager(hostName,
+ containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
+ NodeAddedSchedulerEvent nodeAddEvent1 =
+ new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
+ .get(nm.getNodeId()));
+ rm.getResourceScheduler().handle(nodeAddEvent1);
+ return nm;
+ }
+
+ public static void checkApplicationResourceUsage(int expected, Application application) {
+ Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
+ }
+
+ public static void checkNodeResourceUsage(int expected, NodeManager node) {
+ Assert.assertEquals(expected, node.getUsed().getMemorySize());
+ node.checkResourceUsage();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index c3548cc..4a9e45e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -19,13 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ExpectedCapacities;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfWithoutChildrenOfB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
@@ -35,22 +35,22 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithB1AsParentQueue;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkApplicationResourceUsage;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkNodeResourceUsage;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResource;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResourceGreaterThanZero;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.toSet;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.waitforNMRegistered;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption;
@@ -61,16 +61,11 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.T
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -90,7 +85,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.Groups;
@@ -109,7 +103,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -119,11 +112,9 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
@@ -131,10 +122,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -159,7 +146,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfil
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
@@ -167,24 +153,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@@ -199,16 +176,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -239,51 +211,13 @@ public class TestCapacityScheduler {
@Before
public void setUp() throws Exception {
- ResourceUtils.resetResourceTypes(new Configuration());
- DefaultMetricsSystem.setMiniClusterMode(true);
- resourceManager = new ResourceManager() {
- @Override
- protected RMNodeLabelsManager createNodeLabelManager() {
- RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
- mgr.init(getConfig());
- return mgr;
- }
- };
- CapacitySchedulerConfiguration csConf
- = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(csConf);
- YarnConfiguration conf = new YarnConfiguration(csConf);
- conf.setClass(YarnConfiguration.RM_SCHEDULER,
- CapacityScheduler.class, ResourceScheduler.class);
- resourceManager.init(conf);
- resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
- resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
- ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
- mockContext = mock(RMContext.class);
- when(mockContext.getConfigurationProvider()).thenReturn(
- new LocalConfigurationProvider());
+ resourceManager = createResourceManager();
+ mockContext = createMockRMContext();
}
@After
public void tearDown() throws Exception {
- if (resourceManager != null) {
- QueueMetrics.clearQueueMetrics();
- DefaultMetricsSystem.shutdown();
- resourceManager.stop();
- }
- }
-
- private NodeManager registerNode(ResourceManager rm, String hostName,
- int containerManagerPort, int httpPort, String rackName,
- Resource capability, NodeStatus nodeStatus)
- throws IOException, YarnException {
- NodeManager nm = new NodeManager(hostName,
- containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
- NodeAddedSchedulerEvent nodeAddEvent1 =
- new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
- .get(nm.getNodeId()));
- rm.getResourceScheduler().handle(nodeAddEvent1);
- return nm;
+ stopResourceManager(resourceManager);
}
@Test (timeout = 30000)
@@ -291,8 +225,9 @@ public class TestCapacityScheduler {
CapacityScheduler scheduler = new CapacityScheduler();
scheduler.setRMContext(resourceManager.getRMContext());
Configuration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
+
+ setMinAllocMb(conf, 2048);
+ setMaxAllocMb(conf, 1024);
try {
scheduler.init(conf);
fail("Exception is expected because the min memory allocation is" +
@@ -305,8 +240,8 @@ public class TestCapacityScheduler {
}
conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
+ setMinAllocVcores(conf, 2);
+ setMaxAllocVcores(conf, 1);
try {
scheduler.reinitialize(conf, mockContext);
fail("Exception is expected because the min vcores allocation is" +
@@ -319,19 +254,6 @@ public class TestCapacityScheduler {
}
}
- private NodeManager registerNode(String hostName, int containerManagerPort,
- int httpPort, String rackName,
- Resource capability, NodeStatus nodeStatus)
- throws IOException, YarnException {
- NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
- rackName, capability, resourceManager, nodeStatus);
- NodeAddedSchedulerEvent nodeAddEvent1 =
- new NodeAddedSchedulerEvent(resourceManager.getRMContext()
- .getRMNodes().get(nm.getNodeId()));
- resourceManager.getResourceScheduler().handle(nodeAddEvent1);
- return nm;
- }
-
@Test
public void testCapacityScheduler() throws Exception {
@@ -342,13 +264,13 @@ public class TestCapacityScheduler {
// Register node1
String host_0 = "host_0";
NodeManager nm_0 =
- registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ registerNode(resourceManager, host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(4 * GB, 1), mockNodeStatus);
// Register node2
String host_1 = "host_1";
NodeManager nm_1 =
- registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ registerNode(resourceManager, host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * GB, 1), mockNodeStatus);
// ResourceRequest priorities
@@ -397,10 +319,10 @@ public class TestCapacityScheduler {
LOG.info("Kick!");
// task_0_0 and task_1_0 allocated, used=4G
- nodeUpdate(nm_0);
+ nodeUpdate(resourceManager, nm_0);
// nothing allocated
- nodeUpdate(nm_1);
+ nodeUpdate(resourceManager, nm_1);
// Get allocations from the scheduler
application_0.schedule(); // task_0_0
@@ -429,11 +351,11 @@ public class TestCapacityScheduler {
// Send a heartbeat to kick the tires on the Scheduler
LOG.info("Sending hb from " + nm_0.getHostName());
// nothing new, used=4G
- nodeUpdate(nm_0);
+ nodeUpdate(resourceManager, nm_0);
LOG.info("Sending hb from " + nm_1.getHostName());
// task_0_1 is prefer as locality, used=2G
- nodeUpdate(nm_1);
+ nodeUpdate(resourceManager, nm_1);
// Get allocations from the scheduler
LOG.info("Trying to allocate...");
@@ -443,8 +365,8 @@ public class TestCapacityScheduler {
application_1.schedule();
checkApplicationResourceUsage(5 * GB, application_1);
- nodeUpdate(nm_0);
- nodeUpdate(nm_1);
+ nodeUpdate(resourceManager, nm_0);
+ nodeUpdate(resourceManager, nm_1);
checkNodeResourceUsage(4*GB, nm_0);
checkNodeResourceUsage(2*GB, nm_1);
@@ -658,21 +580,6 @@ public class TestCapacityScheduler {
LOG.info("--- END: testAssignMultiple ---");
}
- private void nodeUpdate(ResourceManager rm, NodeManager nm) {
- RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
- // Send a heartbeat to kick the tires on the Scheduler
- NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
- rm.getResourceScheduler().handle(nodeUpdate);
- }
-
- private void nodeUpdate(NodeManager nm) {
- RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
- // Send a heartbeat to kick the tires on the Scheduler
- NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
- resourceManager.getResourceScheduler().handle(nodeUpdate);
- }
-
-
@Test
public void testMaximumCapacitySetup() {
float delta = 0.0000001f;
@@ -695,10 +602,8 @@ public class TestCapacityScheduler {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
- conf.set(CapacitySchedulerConfiguration.getQueuePrefix(A1)
- + MAXIMUM_ALLOCATION_MB, "1024");
- conf.set(CapacitySchedulerConfiguration.getQueuePrefix(A1)
- + MAXIMUM_ALLOCATION_VCORES, "1");
+ setMaxAllocMb(conf, A1, 1024);
+ setMaxAllocVcores(conf, A1, 1);
scheduler.init(conf);
scheduler.start();
@@ -722,64 +627,6 @@ public class TestCapacityScheduler {
Assert.assertEquals(1, maxAllocationForQueue.getVirtualCores());
}
-
- @Test
- public void testRefreshQueues() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
- setupQueueConfiguration(conf);
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, rmContext);
- checkQueueStructureCapacities(cs);
-
- conf.setCapacity(A, 80f);
- conf.setCapacity(B, 20f);
- cs.reinitialize(conf, mockContext);
- checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f));
- cs.stop();
- }
-
- private void checkApplicationResourceUsage(int expected,
- Application application) {
- Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
- }
-
- private void checkNodeResourceUsage(int expected, NodeManager node) {
- Assert.assertEquals(expected, node.getUsed().getMemorySize());
- node.checkResourceUsage();
- }
-
- /** Test that parseQueue throws an exception when two leaf queues have the
- * same name
- * @throws IOException
- */
- @Test(expected=IOException.class)
- public void testParseQueue() throws IOException {
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- cs.init(conf);
- cs.start();
-
- conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} );
- conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
- conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
-
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null));
- }
-
@Test
public void testParseQueueWithAbsoluteResource() {
String childQueue = "testQueue";
@@ -820,85 +667,6 @@ public class TestCapacityScheduler {
}
@Test
- public void testReconnectedNode() throws Exception {
- CapacitySchedulerConfiguration csConf =
- new CapacitySchedulerConfiguration();
- setupQueueConfiguration(csConf);
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(csConf);
- cs.start();
- cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
- null, null, new RMContainerTokenSecretManager(csConf),
- new NMTokenSecretManagerInRM(csConf),
- new ClientToAMTokenSecretManagerInRM(), null));
-
- RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
- RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
-
- cs.handle(new NodeAddedSchedulerEvent(n1));
- cs.handle(new NodeAddedSchedulerEvent(n2));
-
- Assert.assertEquals(6 * GB, cs.getClusterResource().getMemorySize());
-
- // reconnect n1 with downgraded memory
- n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
- cs.handle(new NodeRemovedSchedulerEvent(n1));
- cs.handle(new NodeAddedSchedulerEvent(n1));
-
- Assert.assertEquals(4 * GB, cs.getClusterResource().getMemorySize());
- cs.stop();
- }
-
- @Test
- public void testRefreshQueuesWithNewQueue() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null));
- checkQueueStructureCapacities(cs);
-
- // Add a new queue b4
- final String b4 = B + ".b4";
- final float b4Capacity = 10;
- final float modifiedB3Capacity = B3_CAPACITY - b4Capacity;
-
- try {
- conf.setCapacity(A, 80f);
- conf.setCapacity(B, 20f);
- conf.setQueues(B, new String[]{"b1", "b2", "b3", "b4"});
- conf.setCapacity(B1, B1_CAPACITY);
- conf.setCapacity(B2, B2_CAPACITY);
- conf.setCapacity(B3, modifiedB3Capacity);
- conf.setCapacity(b4, b4Capacity);
- cs.reinitialize(conf, mockContext);
-
- final float capA = 80f / 100.0f;
- final float capB = 20f / 100.0f;
- Map<String, ExpectedCapacities> expectedCapacities = getDefaultCapacities(capA, capB);
- expectedCapacities.put(B3, new ExpectedCapacities(modifiedB3Capacity / 100.0f, capB));
- expectedCapacities.put(b4, new ExpectedCapacities(b4Capacity / 100.0f, capB));
- checkQueueStructureCapacities(cs, expectedCapacities);
-
- // Verify parent for B4
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueB = findQueue(rootQueue, B);
- CSQueue queueB4 = findQueue(queueB, b4);
-
- assertEquals(queueB, queueB4.getParent());
- } finally {
- cs.stop();
- }
- }
- @Test
public void testCapacitySchedulerInfo() throws Exception {
QueueInfo queueInfo = resourceManager.getResourceScheduler().getQueueInfo("a", true, true);
Assert.assertEquals("Queue Name should be a", "a",
@@ -928,36 +696,6 @@ public class TestCapacityScheduler {
}
@Test
- public void testBlackListNodes() throws Exception {
- Configuration conf = new Configuration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- MockRM rm = new MockRM(conf);
- rm.start();
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-
- String host = "127.0.0.1";
- RMNode node =
- MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
- cs.handle(new NodeAddedSchedulerEvent(node));
-
- ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user");
-
- // Verify the blacklist can be updated independent of requesting containers
- cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
- Collections.<ContainerId>emptyList(),
- Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
- Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
- .isPlaceBlacklisted(host));
- cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
- Collections.<ContainerId>emptyList(), null,
- Collections.singletonList(host), NULL_UPDATE_REQUESTS);
- Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
- .isPlaceBlacklisted(host));
- rm.stop();
- }
-
- @Test
public void testAllocateReorder() throws Exception {
//Confirm that allocation (resource request) alone will trigger a change in
@@ -1163,53 +901,6 @@ public class TestCapacityScheduler {
}
@Test
- public void testGetAppsInQueue() throws Exception {
- Application application_0 = new Application("user_0", "a1", resourceManager);
- application_0.submit();
-
- Application application_1 = new Application("user_0", "a2", resourceManager);
- application_1.submit();
-
- Application application_2 = new Application("user_0", "b2", resourceManager);
- application_2.submit();
-
- ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
- assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
- assertEquals(2, appsInA.size());
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
- assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
- assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
- assertEquals(3, appsInRoot.size());
-
- Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
- }
-
- @Test
- public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- MockRM rm = new MockRM(conf);
- @SuppressWarnings("unchecked")
- AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
- (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
- .getResourceScheduler();
- SchedulerApplication<SchedulerApplicationAttempt> app =
- TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
- cs.getSchedulerApplications(), cs, "a1");
- Assert.assertEquals("a1", app.getQueue().getQueueName());
- }
-
- @Test
public void testAsyncScheduling() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
@@ -1381,37 +1072,6 @@ public class TestCapacityScheduler {
rm.stop();
}
- @Test
- public void testNumClusterNodes() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(conf);
- RMContext rmContext = TestUtils.getMockRMContext();
- cs.setRMContext(rmContext);
- CapacitySchedulerConfiguration csConf =
- new CapacitySchedulerConfiguration();
- setupQueueConfiguration(csConf);
- cs.init(csConf);
- cs.start();
- assertEquals(0, cs.getNumClusterNodes());
-
- RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
- RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
- cs.handle(new NodeAddedSchedulerEvent(n1));
- cs.handle(new NodeAddedSchedulerEvent(n2));
- assertEquals(2, cs.getNumClusterNodes());
-
- cs.handle(new NodeRemovedSchedulerEvent(n1));
- assertEquals(1, cs.getNumClusterNodes());
- cs.handle(new NodeAddedSchedulerEvent(n1));
- assertEquals(2, cs.getNumClusterNodes());
- cs.handle(new NodeRemovedSchedulerEvent(n2));
- cs.handle(new NodeRemovedSchedulerEvent(n1));
- assertEquals(0, cs.getNumClusterNodes());
-
- cs.stop();
- }
-
@Test(timeout = 120000)
public void testPreemptionInfo() throws Exception {
Configuration conf = new Configuration();
@@ -1553,1790 +1213,56 @@ public class TestCapacityScheduler {
Assert.assertTrue(containers.size() == 1);
}
- private MockRM setUpMove() {
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- return setUpMove(conf);
- }
-
- private MockRM setUpMove(Configuration config) {
- CapacitySchedulerConfiguration conf =
- new CapacitySchedulerConfiguration(config);
- setupQueueConfiguration(conf);
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- MockRM rm = new MockRM(conf);
- rm.start();
- return rm;
- }
-
@Test
- public void testAppSubmission() throws Exception {
+ public void testPreemptionDisabled() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- conf.setQueues(A, new String[] {"a1", "a2", "b"});
- conf.setCapacity(A1, 20);
- conf.setCapacity("root.a.b", 10);
- MockRM rm = new MockRM(conf);
- rm.start();
-
- RMApp noParentQueueApp = submitAppAndWaitForState(rm, "q", RMAppState.FAILED);
- Assert.assertEquals(RMAppState.FAILED, noParentQueueApp.getState());
-
- RMApp ambiguousQueueApp = submitAppAndWaitForState(rm, "b", RMAppState.FAILED);
- Assert.assertEquals(RMAppState.FAILED, ambiguousQueueApp.getState());
-
- RMApp emptyPartQueueApp = submitAppAndWaitForState(rm, "root..a1", RMAppState.FAILED);
- Assert.assertEquals(RMAppState.FAILED, emptyPartQueueApp.getState());
-
- RMApp failedAutoQueue = submitAppAndWaitForState(rm, "root.a.b.c.d", RMAppState.FAILED);
- Assert.assertEquals(RMAppState.FAILED, failedAutoQueue.getState());
- }
-
- private RMApp submitAppAndWaitForState(MockRM rm, String b, RMAppState state) throws Exception {
- MockRMAppSubmissionData ambiguousQueueAppData =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withWaitForAppAcceptedState(false)
- .withAppName("app")
- .withUser("user")
- .withAcls(null)
- .withQueue(b)
- .withUnmanagedAM(false)
- .build();
- RMApp app1 = MockRMAppSubmitter.submit(rm, ambiguousQueueAppData);
- rm.waitForState(app1.getApplicationId(), state);
- return app1;
- }
-
- @Test
- public void testMoveAppBasic() throws Exception {
- MockRM rm = setUpMove();
- AbstractYarnScheduler scheduler =
- (AbstractYarnScheduler) rm.getResourceScheduler();
- QueueMetrics metrics = scheduler.getRootQueueMetrics();
- Assert.assertEquals(0, metrics.getAppsPending());
- // submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
- String queue =
- scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("a1", queue);
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- assertEquals(1, metrics.getAppsPending());
-
- List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
- assertTrue(appsInB1.isEmpty());
-
- List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.isEmpty());
-
- // now move the app
- scheduler.moveApplication(app.getApplicationId(), "b1");
-
- // check postconditions
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(1, appsInB1.size());
- queue =
- scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("b1", queue);
-
- appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.contains(appAttemptId));
- assertEquals(1, appsInB.size());
-
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- assertEquals(1, metrics.getAppsPending());
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- assertTrue(appsInA1.isEmpty());
-
- appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.isEmpty());
-
- rm.stop();
- }
-
- @Test
- public void testMoveAppPendingMetrics() throws Exception {
- MockRM rm = setUpMove();
- AbstractYarnScheduler scheduler =
- (AbstractYarnScheduler) rm.getResourceScheduler();
- QueueMetrics metrics = scheduler.getRootQueueMetrics();
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
-
- assertEquals(0, appsInA1.size());
- assertEquals(0, appsInB1.size());
- Assert.assertEquals(0, metrics.getAppsPending());
-
- // submit two apps in a1
- RMApp app1 = MockRMAppSubmitter.submit(rm,
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .build());
- RMApp app2 = MockRMAppSubmitter.submit(rm,
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-2")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .build());
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(2, appsInA1.size());
- assertEquals(0, appsInB1.size());
- assertEquals(2, metrics.getAppsPending());
-
- // submit one app in b1
- RMApp app3 = MockRMAppSubmitter.submit(rm,
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-2")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("b1")
- .build());
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(2, appsInA1.size());
- assertEquals(1, appsInB1.size());
- assertEquals(3, metrics.getAppsPending());
-
- // now move the app1 from a1 to b1
- scheduler.moveApplication(app1.getApplicationId(), "b1");
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(1, appsInA1.size());
- assertEquals(2, appsInB1.size());
- assertEquals(3, metrics.getAppsPending());
-
- // now move the app2 from a1 to b1
- scheduler.moveApplication(app2.getApplicationId(), "b1");
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(0, appsInA1.size());
- assertEquals(3, appsInB1.size());
- assertEquals(3, metrics.getAppsPending());
-
- // now move the app3 from b1 to a1
- scheduler.moveApplication(app3.getApplicationId(), "a1");
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, rmContext);
- appsInA1 = scheduler.getAppsInQueue("a1");
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(1, appsInA1.size());
- assertEquals(2, appsInB1.size());
- assertEquals(3, metrics.getAppsPending());
- rm.stop();
- }
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueB = findQueue(rootQueue, B);
+ CSQueue queueB2 = findQueue(queueB, B2);
- @Test
- public void testMoveAppSameParent() throws Exception {
- MockRM rm = setUpMove();
- AbstractYarnScheduler scheduler =
- (AbstractYarnScheduler) rm.getResourceScheduler();
-
- // submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
-
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
- String queue =
- scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("a1", queue);
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2");
- assertTrue(appsInA2.isEmpty());
-
- // now move the app
- scheduler.moveApplication(app.getApplicationId(), "a2");
-
- // check postconditions
- appsInA2 = scheduler.getAppsInQueue("a2");
- assertEquals(1, appsInA2.size());
- queue =
- scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("a2", queue);
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- assertTrue(appsInA1.isEmpty());
-
- appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- rm.stop();
- }
-
- @Test
- public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
-
- ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
- NodeStatus mockNodeStatus = createMockNodeStatus();
-
- // Register node1
- String host_0 = "host_0";
- NodeManager nm_0 =
- registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
- Resources.createResource(4 * GB, 1), mockNodeStatus);
-
- // Register node2
- String host_1 = "host_1";
- NodeManager nm_1 =
- registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
- Resources.createResource(2 * GB, 1), mockNodeStatus);
-
- // ResourceRequest priorities
- Priority priority_0 = Priority.newInstance(0);
- Priority priority_1 = Priority.newInstance(1);
-
- // Submit application_0
- Application application_0 =
- new Application("user_0", "a1", resourceManager);
- application_0.submit(); // app + app attempt event sent to scheduler
-
- application_0.addNodeManager(host_0, 1234, nm_0);
- application_0.addNodeManager(host_1, 1234, nm_1);
-
- Resource capability_0_0 = Resources.createResource(1 * GB, 1);
- application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
- Resource capability_0_1 = Resources.createResource(2 * GB, 1);
- application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
- Task task_0_0 =
- new Task(application_0, priority_1, new String[] { host_0, host_1 });
- application_0.addTask(task_0_0);
-
- // Submit application_1
- Application application_1 =
- new Application("user_1", "b2", resourceManager);
- application_1.submit(); // app + app attempt event sent to scheduler
-
- application_1.addNodeManager(host_0, 1234, nm_0);
- application_1.addNodeManager(host_1, 1234, nm_1);
-
- Resource capability_1_0 = Resources.createResource(1 * GB, 1);
- application_1.addResourceRequestSpec(priority_1, capability_1_0);
-
- Resource capability_1_1 = Resources.createResource(2 * GB, 1);
- application_1.addResourceRequestSpec(priority_0, capability_1_1);
-
- Task task_1_0 =
- new Task(application_1, priority_1, new String[] { host_0, host_1 });
- application_1.addTask(task_1_0);
-
- // Send resource requests to the scheduler
- application_0.schedule(); // allocate
- application_1.schedule(); // allocate
-
- // task_0_0 task_1_0 allocated, used=2G
- nodeUpdate(nm_0);
-
- // nothing allocated
- nodeUpdate(nm_1);
-
- // Get allocations from the scheduler
- application_0.schedule(); // task_0_0
- checkApplicationResourceUsage(1 * GB, application_0);
-
- application_1.schedule(); // task_1_0
- checkApplicationResourceUsage(1 * GB, application_1);
-
- checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G
- // available
- checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available
-
- // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
- // total cap)
- scheduler.moveApplication(application_0.getApplicationId(), "b1");
-
- // 2GB 1C
- Task task_1_1 =
- new Task(application_1, priority_0,
- new String[] { ResourceRequest.ANY });
- application_1.addTask(task_1_1);
-
- application_1.schedule();
-
- // 2GB 1C
- Task task_0_1 =
- new Task(application_0, priority_0, new String[] { host_0, host_1 });
- application_0.addTask(task_0_1);
-
- application_0.schedule();
-
- // prev 2G used free 2G
- nodeUpdate(nm_0);
-
- // prev 0G used free 2G
- nodeUpdate(nm_1);
-
- // Get allocations from the scheduler
- application_1.schedule();
- checkApplicationResourceUsage(3 * GB, application_1);
-
- // Get allocations from the scheduler
- application_0.schedule();
- checkApplicationResourceUsage(3 * GB, application_0);
-
- checkNodeResourceUsage(4 * GB, nm_0);
- checkNodeResourceUsage(2 * GB, nm_1);
-
- }
-
- @Test
- public void testMoveAppSuccess() throws Exception {
-
- ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
- NodeStatus mockNodeStatus = createMockNodeStatus();
-
- // Register node1
- String host_0 = "host_0";
- NodeManager nm_0 =
- registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
- Resources.createResource(5 * GB, 1), mockNodeStatus);
-
- // Register node2
- String host_1 = "host_1";
- NodeManager nm_1 =
- registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
- Resources.createResource(5 * GB, 1), mockNodeStatus);
-
- // ResourceRequest priorities
- Priority priority_0 = Priority.newInstance(0);
- Priority priority_1 = Priority.newInstance(1);
-
- // Submit application_0
- Application application_0 =
- new Application("user_0", "a1", resourceManager);
- application_0.submit(); // app + app attempt event sent to scheduler
-
- application_0.addNodeManager(host_0, 1234, nm_0);
- application_0.addNodeManager(host_1, 1234, nm_1);
-
- Resource capability_0_0 = Resources.createResource(3 * GB, 1);
- application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
- Resource capability_0_1 = Resources.createResource(2 * GB, 1);
- application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
- Task task_0_0 =
- new Task(application_0, priority_1, new String[] { host_0, host_1 });
- application_0.addTask(task_0_0);
-
- // Submit application_1
- Application application_1 =
- new Application("user_1", "b2", resourceManager);
- application_1.submit(); // app + app attempt event sent to scheduler
-
- application_1.addNodeManager(host_0, 1234, nm_0);
- application_1.addNodeManager(host_1, 1234, nm_1);
-
- Resource capability_1_0 = Resources.createResource(1 * GB, 1);
- application_1.addResourceRequestSpec(priority_1, capability_1_0);
-
- Resource capability_1_1 = Resources.createResource(2 * GB, 1);
- application_1.addResourceRequestSpec(priority_0, capability_1_1);
-
- Task task_1_0 =
- new Task(application_1, priority_1, new String[] { host_0, host_1 });
- application_1.addTask(task_1_0);
-
- // Send resource requests to the scheduler
- application_0.schedule(); // allocate
- application_1.schedule(); // allocate
-
- // b2 can only run 1 app at a time
- scheduler.moveApplication(application_0.getApplicationId(), "b2");
-
- nodeUpdate(nm_0);
-
- nodeUpdate(nm_1);
-
- // Get allocations from the scheduler
- application_0.schedule(); // task_0_0
- checkApplicationResourceUsage(0 * GB, application_0);
-
- application_1.schedule(); // task_1_0
- checkApplicationResourceUsage(1 * GB, application_1);
-
- // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
- // not scheduled
- checkNodeResourceUsage(1 * GB, nm_0);
- checkNodeResourceUsage(0 * GB, nm_1);
-
- // lets move application_0 to a queue where it can run
- scheduler.moveApplication(application_0.getApplicationId(), "a2");
- application_0.schedule();
-
- nodeUpdate(nm_1);
-
- // Get allocations from the scheduler
- application_0.schedule(); // task_0_0
- checkApplicationResourceUsage(3 * GB, application_0);
-
- checkNodeResourceUsage(1 * GB, nm_0);
- checkNodeResourceUsage(3 * GB, nm_1);
-
- }
-
- @Test(expected = YarnException.class)
- public void testMoveAppViolateQueueState() throws Exception {
- resourceManager = new ResourceManager() {
- @Override
- protected RMNodeLabelsManager createNodeLabelManager() {
- RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
- mgr.init(getConfig());
- return mgr;
- }
- };
- CapacitySchedulerConfiguration csConf =
- new CapacitySchedulerConfiguration();
- setupQueueConfiguration(csConf);
- StringBuilder qState = new StringBuilder();
- qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
- .append(CapacitySchedulerConfiguration.DOT)
- .append(CapacitySchedulerConfiguration.STATE);
- csConf.set(qState.toString(), QueueState.STOPPED.name());
- YarnConfiguration conf = new YarnConfiguration(csConf);
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- resourceManager.init(conf);
- resourceManager.getRMContext().getContainerTokenSecretManager()
- .rollMasterKey();
- resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
- ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
- mockContext = mock(RMContext.class);
- when(mockContext.getConfigurationProvider()).thenReturn(
- new LocalConfigurationProvider());
-
- ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
- NodeStatus mockNodeStatus = createMockNodeStatus();
-
- // Register node1
- String host_0 = "host_0";
- NodeManager nm_0 =
- registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
- Resources.createResource(6 * GB, 1), mockNodeStatus);
-
- // ResourceRequest priorities
- Priority priority_0 = Priority.newInstance(0);
- Priority priority_1 = Priority.newInstance(1);
-
- // Submit application_0
- Application application_0 =
- new Application("user_0", "a1", resourceManager);
- application_0.submit(); // app + app attempt event sent to scheduler
-
- application_0.addNodeManager(host_0, 1234, nm_0);
-
- Resource capability_0_0 = Resources.createResource(3 * GB, 1);
- application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
- Resource capability_0_1 = Resources.createResource(2 * GB, 1);
- application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
- Task task_0_0 =
- new Task(application_0, priority_1, new String[] { host_0 });
- application_0.addTask(task_0_0);
-
- // Send resource requests to the scheduler
- application_0.schedule(); // allocate
-
- // task_0_0 allocated
- nodeUpdate(nm_0);
-
- // Get allocations from the scheduler
- application_0.schedule(); // task_0_0
- checkApplicationResourceUsage(3 * GB, application_0);
-
- checkNodeResourceUsage(3 * GB, nm_0);
- // b2 queue contains 3GB consumption app,
- // add another 3GB will hit max capacity limit on queue b
- scheduler.moveApplication(application_0.getApplicationId(), "b1");
-
- }
-
- @Test
- public void testMoveAppQueueMetricsCheck() throws Exception {
- ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
- NodeStatus mockNodeStatus = createMockNodeStatus();
-
- // Register node1
- String host_0 = "host_0";
- NodeManager nm_0 =
- registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
- Resources.createResource(5 * GB, 1), mockNodeStatus);
-
- // Register node2
- String host_1 = "host_1";
- NodeManager nm_1 =
- registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
- Resources.createResource(5 * GB, 1), mockNodeStatus);
-
- // ResourceRequest priorities
- Priority priority_0 = Priority.newInstance(0);
- Priority priority_1 = Priority.newInstance(1);
-
- // Submit application_0
- Application application_0 =
- new Application("user_0", "a1", resourceManager);
- application_0.submit(); // app + app attempt event sent to scheduler
-
- application_0.addNodeManager(host_0, 1234, nm_0);
- application_0.addNodeManager(host_1, 1234, nm_1);
-
- Resource capability_0_0 = Resources.createResource(3 * GB, 1);
- application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
- Resource capability_0_1 = Resources.createResource(2 * GB, 1);
- application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
- Task task_0_0 =
- new Task(application_0, priority_1, new String[] { host_0, host_1 });
- application_0.addTask(task_0_0);
-
- // Submit application_1
- Application application_1 =
- new Application("user_1", "b2", resourceManager);
- application_1.submit(); // app + app attempt event sent to scheduler
-
- application_1.addNodeManager(host_0, 1234, nm_0);
- application_1.addNodeManager(host_1, 1234, nm_1);
-
- Resource capability_1_0 = Resources.createResource(1 * GB, 1);
- application_1.addResourceRequestSpec(priority_1, capability_1_0);
-
- Resource capability_1_1 = Resources.createResource(2 * GB, 1);
- application_1.addResourceRequestSpec(priority_0, capability_1_1);
-
- Task task_1_0 =
- new Task(application_1, priority_1, new String[] { host_0, host_1 });
- application_1.addTask(task_1_0);
-
- // Send resource requests to the scheduler
- application_0.schedule(); // allocate
- application_1.schedule(); // allocate
-
- nodeUpdate(nm_0);
-
- nodeUpdate(nm_1);
-
- CapacityScheduler cs =
- (CapacityScheduler) resourceManager.getResourceScheduler();
- CSQueue origRootQ = cs.getRootQueue();
- CapacitySchedulerInfo oldInfo =
- new CapacitySchedulerInfo(origRootQ, cs);
- int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
- int origNumAppsRoot = origRootQ.getNumApplications();
-
- scheduler.moveApplication(application_0.getApplicationId(), "a2");
-
- CSQueue newRootQ = cs.getRootQueue();
- int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
- int newNumAppsRoot = newRootQ.getNumApplications();
- CapacitySchedulerInfo newInfo =
- new CapacitySchedulerInfo(newRootQ, cs);
- CapacitySchedulerLeafQueueInfo origOldA1 =
- (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
- CapacitySchedulerLeafQueueInfo origNewA1 =
- (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
- CapacitySchedulerLeafQueueInfo targetOldA2 =
- (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
- CapacitySchedulerLeafQueueInfo targetNewA2 =
- (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
- // originally submitted here
- assertEquals(1, origOldA1.getNumApplications());
- assertEquals(1, origNumAppsA);
- assertEquals(2, origNumAppsRoot);
- // after the move
- assertEquals(0, origNewA1.getNumApplications());
- assertEquals(1, newNumAppsA);
- assertEquals(2, newNumAppsRoot);
- // original consumption on a1
- assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemorySize());
- assertEquals(1, origOldA1.getResourcesUsed().getvCores());
- assertEquals(0, origNewA1.getResourcesUsed().getMemorySize()); // after the move
- assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
- // app moved here with live containers
- assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemorySize());
- assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
- // it was empty before the move
- assertEquals(0, targetOldA2.getNumApplications());
- assertEquals(0, targetOldA2.getResourcesUsed().getMemorySize());
- assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
- // after the app moved here
- assertEquals(1, targetNewA2.getNumApplications());
- // 1 container on original queue before move
- assertEquals(1, origOldA1.getNumContainers());
- // after the move the resource released
- assertEquals(0, origNewA1.getNumContainers());
- // and moved to the new queue
- assertEquals(1, targetNewA2.getNumContainers());
- // which originally didn't have any
- assertEquals(0, targetOldA2.getNumContainers());
- // 1 user with 3GB
- assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
- .getResourcesUsed().getMemorySize());
- // 1 user with 1 core
- assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
- .getResourcesUsed().getvCores());
- // user ha no more running app in the orig queue
- assertEquals(0, origNewA1.getUsers().getUsersList().size());
- // 1 user with 3GB
- assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
- .getResourcesUsed().getMemorySize());
- // 1 user with 1 core
- assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
- .getResourcesUsed().getvCores());
-
- // Get allocations from the scheduler
- application_0.schedule(); // task_0_0
- checkApplicationResourceUsage(3 * GB, application_0);
-
- application_1.schedule(); // task_1_0
- checkApplicationResourceUsage(1 * GB, application_1);
-
- // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
- // not scheduled
- checkNodeResourceUsage(4 * GB, nm_0);
- checkNodeResourceUsage(0 * GB, nm_1);
-
- }
-
- private int getNumAppsInQueue(String name, List<CSQueue> queues) {
- for (CSQueue queue : queues) {
- if (queue.getQueueShortName().equals(name)) {
- return queue.getNumApplications();
- }
- }
- return -1;
- }
-
- private CapacitySchedulerQueueInfo getQueueInfo(String name,
- CapacitySchedulerQueueInfoList info) {
- if (info != null) {
- for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
- if (queueInfo.getQueueName().equals(name)) {
- return queueInfo;
- } else {
- CapacitySchedulerQueueInfo result =
- getQueueInfo(name, queueInfo.getQueues());
- if (result == null) {
- continue;
- }
- return result;
- }
- }
- }
- return null;
- }
-
- @Test
- public void testMoveAllApps() throws Exception {
- MockRM rm = setUpMove();
- AbstractYarnScheduler scheduler =
- (AbstractYarnScheduler) rm.getResourceScheduler();
-
- // submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
-
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
- String queue =
- scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("a1", queue);
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
- assertTrue(appsInB1.isEmpty());
-
- List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.isEmpty());
-
- // now move the app
- scheduler.moveAllApps("a1", "b1");
-
- // check postconditions
- Thread.sleep(1000);
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(1, appsInB1.size());
- queue =
- scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("b1", queue);
-
- appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.contains(appAttemptId));
- assertEquals(1, appsInB.size());
-
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- assertTrue(appsInA1.isEmpty());
-
- appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.isEmpty());
-
- rm.stop();
- }
-
- @Test
- public void testMoveAllAppsInvalidDestination() throws Exception {
- MockRM rm = setUpMove();
- YarnScheduler scheduler = rm.getResourceScheduler();
-
- // submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
-
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
- assertTrue(appsInB1.isEmpty());
-
- List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.isEmpty());
-
- // now move the app
- try {
- scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
- Assert.fail();
- } catch (YarnException e) {
- // expected
- }
-
- // check postconditions, app should still be in a1
- appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertTrue(appsInB1.isEmpty());
-
- appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.isEmpty());
-
- rm.stop();
- }
-
- @Test
- public void testMoveAllAppsInvalidSource() throws Exception {
- MockRM rm = setUpMove();
- YarnScheduler scheduler = rm.getResourceScheduler();
-
- // submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
-
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
- assertTrue(appsInB1.isEmpty());
-
- List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.isEmpty());
-
- // now move the app
- try {
- scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
- Assert.fail();
- } catch (YarnException e) {
- // expected
- }
-
- // check postconditions, app should still be in a1
- appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertTrue(appsInB1.isEmpty());
-
- appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.isEmpty());
-
- rm.stop();
- }
-
- @Test(timeout = 60000)
- public void testMoveAttemptNotAdded() throws Exception {
- Configuration conf = new Configuration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- MockRM rm = new MockRM(getCapacityConfiguration(conf));
- rm.start();
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-
- ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
- ApplicationAttemptId appAttemptId =
- BuilderUtils.newApplicationAttemptId(appId, 1);
-
- RMAppAttemptMetrics attemptMetric =
- new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
- RMAppImpl app = mock(RMAppImpl.class);
- when(app.getApplicationId()).thenReturn(appId);
- RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
- Container container = mock(Container.class);
- when(attempt.getMasterContainer()).thenReturn(container);
- ApplicationSubmissionContext submissionContext =
- mock(ApplicationSubmissionContext.class);
- when(attempt.getSubmissionContext()).thenReturn(submissionContext);
- when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
- when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
- when(app.getCurrentAppAttempt()).thenReturn(attempt);
-
- rm.getRMContext().getRMApps().put(appId, app);
-
- SchedulerEvent addAppEvent =
- new AppAddedSchedulerEvent(appId, "a1", "user");
- try {
- cs.moveApplication(appId, "b1");
- fail("Move should throw exception app not available");
- } catch (YarnException e) {
- assertEquals("App to be moved application_100_0001 not found.",
- e.getMessage());
- }
- cs.handle(addAppEvent);
- cs.moveApplication(appId, "b1");
- SchedulerEvent addAttemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId, false);
- cs.handle(addAttemptEvent);
- CSQueue rootQ = cs.getRootQueue();
- CSQueue queueB = cs.getQueue("b");
- CSQueue queueA = cs.getQueue("a");
- CSQueue queueA1 = cs.getQueue("a1");
- CSQueue queueB1 = cs.getQueue("b1");
- Assert.assertEquals(1, rootQ.getNumApplications());
- Assert.assertEquals(0, queueA.getNumApplications());
- Assert.assertEquals(1, queueB.getNumApplications());
- Assert.assertEquals(0, queueA1.getNumApplications());
- Assert.assertEquals(1, queueB1.getNumApplications());
-
- rm.close();
- }
-
- @Test
- public void testRemoveAttemptMoveAdded() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- CapacityScheduler.class);
- conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
- // Create Mock RM
- MockRM rm = new MockRM(getCapacityConfiguration(conf));
- CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
- // add node
- Resource newResource = Resource.newInstance(4 * GB, 1);
- RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
- SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
- sch.handle(addNode);
-
- ApplicationAttemptId appAttemptId = appHelper(rm, sch, 100, 1, "a1", "user");
-
- // get Queues
- CSQueue queueA1 = sch.getQueue("a1");
- CSQueue queueB = sch.getQueue("b");
- CSQueue queueB1 = sch.getQueue("b1");
-
- // add Running rm container and simulate live containers to a1
- ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
- RMContainerImpl rmContainer = mock(RMContainerImpl.class);
- when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
- Container container2 = mock(Container.class);
- when(rmContainer.getContainer()).thenReturn(container2);
- Resource resource = Resource.newInstance(1024, 1);
- when(container2.getResource()).thenReturn(resource);
- when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
- when(container2.getNodeId()).thenReturn(node.getNodeID());
- when(container2.getId()).thenReturn(newContainerId);
- when(rmContainer.getNodeLabelExpression())
- .thenReturn(RMNodeLabelsManager.NO_LABEL);
- when(rmContainer.getContainerId()).thenReturn(newContainerId);
- sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
- .put(newContainerId, rmContainer);
- QueueMetrics queueA1M = queueA1.getMetrics();
- queueA1M.incrPendingResources(rmContainer.getNodeLabelExpression(),
- "user1", 1, resource);
- queueA1M.allocateResources(rmContainer.getNodeLabelExpression(),
- "user1", resource);
- // remove attempt
- sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
- RMAppAttemptState.KILLED, true));
- // Move application to queue b1
- sch.moveApplication(appAttemptId.getApplicationId(), "b1");
- // Check queue metrics after move
- Assert.assertEquals(0, queueA1.getNumApplications());
- Assert.assertEquals(1, queueB.getNumApplications());
- Assert.assertEquals(0, queueB1.getNumApplications());
-
- // Release attempt add event
- ApplicationAttemptId appAttemptId2 =
- BuilderUtils.newApplicationAttemptId(appAttemptId.getApplicationId(), 2);
- SchedulerEvent addAttemptEvent2 =
- new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
- sch.handle(addAttemptEvent2);
-
- // Check metrics after attempt added
- Assert.assertEquals(0, queueA1.getNumApplications());
- Assert.assertEquals(1, queueB.getNumApplications());
- Assert.assertEquals(1, queueB1.getNumApplications());
-
-
- QueueMetrics queueB1M = queueB1.getMetrics();
- QueueMetrics queueBM = queueB.getMetrics();
- // Verify allocation MB of current state
- Assert.assertEquals(0, queueA1M.getAllocatedMB());
- Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
- Assert.assertEquals(1024, queueB1M.getAllocatedMB());
- Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores());
-
- // remove attempt
- sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
- RMAppAttemptState.FINISHED, false));
-
- Assert.assertEquals(0, queueA1M.getAllocatedMB());
- Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
- Assert.assertEquals(0, queueB1M.getAllocatedMB());
- Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores());
-
- verifyQueueMetrics(queueB1M);
- verifyQueueMetrics(queueBM);
- // Verify queue A1 metrics
- verifyQueueMetrics(queueA1M);
- rm.close();
- }
-
- private void verifyQueueMetrics(QueueMetrics queue) {
- Assert.assertEquals(0, queue.getPendingMB());
- Assert.assertEquals(0, queue.getActiveUsers());
- Assert.assertEquals(0, queue.getActiveApps());
- Assert.assertEquals(0, queue.getAppsPending());
- Assert.assertEquals(0, queue.getAppsRunning());
- Assert.assertEquals(0, queue.getAllocatedMB());
- Assert.assertEquals(0, queue.getAllocatedVirtualCores());
- }
-
- private Configuration getCapacityConfiguration(Configuration config) {
- CapacitySchedulerConfiguration conf =
- new CapacitySchedulerConfiguration(config);
-
- // Define top-level queues
- conf.setQueues(CapacitySchedulerConfiguration.ROOT,
- new String[] {"a", "b"});
- conf.setCapacity(A, 50);
- conf.setCapacity(B, 50);
- conf.setQueues(A, new String[] {"a1", "a2"});
- conf.setCapacity(A1, 50);
- conf.setCapacity(A2, 50);
- conf.setQueues(B, new String[] {"b1"});
- conf.setCapacity(B1, 100);
- return conf;
- }
-
- @Test
- public void testKillAllAppsInQueue() throws Exception {
- MockRM rm = setUpMove();
- AbstractYarnScheduler scheduler =
- (AbstractYarnScheduler) rm.getResourceScheduler();
-
- // submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
-
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
- String queue =
- scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("a1", queue);
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- // now kill the app
- scheduler.killAllAppsInQueue("a1");
-
- // check postconditions
- rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
- rm.waitForAppRemovedFromScheduler(app.getApplicationId());
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.isEmpty());
-
- appsInA1 = scheduler.getAppsInQueue("a1");
- assertTrue(appsInA1.isEmpty());
-
- appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.isEmpty());
-
- rm.stop();
- }
-
- @Test
- public void testKillAllAppsInvalidSource() throws Exception {
- MockRM rm = setUpMove();
- YarnScheduler scheduler = rm.getResourceScheduler();
-
- // submit an app
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("user_0")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
-
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- // now kill the app
- try {
- scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
- Assert.fail();
- } catch (YarnException e) {
- // expected
- }
-
- // check postconditions, app should still be in a1
- appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(1, appsInA1.size());
-
- appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(1, appsInA.size());
-
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(1, appsInRoot.size());
-
- rm.stop();
- }
-
- // Test to ensure that we don't carry out reservation on nodes
- // that have no CPU available when using the DominantResourceCalculator
- @Test(timeout = 30000)
- public void testAppReservationWithDominantResourceCalculator() throws Exception {
- CapacitySchedulerConfiguration csconf =
- new CapacitySchedulerConfiguration();
- csconf.setResourceComparator(DominantResourceCalculator.class);
-
- YarnConfiguration conf = new YarnConfiguration(csconf);
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
-
- MockRM rm = new MockRM(conf);
- rm.start();
-
- MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);
-
- // register extra nodes to bump up cluster resource
- MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
- rm.registerNode("127.0.0.1:1236", 10 * GB, 4);
-
- RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm);
- // kick the scheduling
- nm1.nodeHeartbeat(true);
- RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
- MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
- am1.registerAppAttempt();
- SchedulerNodeReport report_nm1 =
- rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-
- // check node report
- Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemorySize());
- Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemorySize());
-
- // add request for containers
- am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1);
- am1.schedule(); // send the request
-
- // kick the scheduler, container reservation should not happen
- nm1.nodeHeartbeat(true);
- Thread.sleep(1000);
- AllocateResponse allocResponse = am1.schedule();
- ApplicationResourceUsageReport report =
- rm.getResourceScheduler().getAppResourceUsageReport(
- attempt1.getAppAttemptId());
- Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
- Assert.assertEquals(0, report.getNumReservedContainers());
-
- // container should get allocated on this node
- nm2.nodeHeartbeat(true);
-
- while (allocResponse.getAllocatedContainers().size() == 0) {
- Thread.sleep(100);
- allocResponse = am1.schedule();
- }
- report =
- rm.getResourceScheduler().getAppResourceUsageReport(
- attempt1.getAppAttemptId());
- Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
- Assert.assertEquals(0, report.getNumReservedContainers());
- rm.stop();
- }
-
- @Test
- public void testPreemptionDisabled() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
- RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
- setupQueueConfiguration(conf);
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, rmContext);
-
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueB = findQueue(rootQueue, B);
- CSQueue queueB2 = findQueue(queueB, B2);
-
- // When preemption turned on for the whole system
- // (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other
- // preemption properties set, queue root.b.b2 should be preemptable.
- assertFalse("queue " + B2 + " should default to preemptable",
- queueB2.getPreemptionDisabled());
+ // When preemption turned on for the whole system
+ // (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other
+ // preemption properties set, queue root.b.b2 should be preemptable.
+ assertFalse("queue " + B2 + " should default to preemptable",
+ queueB2.getPreemptionDisabled());
// Disable preemption at the root queue level.
// The preemption property should be inherited from root all the
- // way down so that root.b.b2 should NOT be preemptable.
- conf.setPreemptionDisabled(rootQueue.getQueuePath(), true);
- cs.reinitialize(conf, rmContext);
- assertTrue(
- "queue " + B2 + " should have inherited non-preemptability from root",
- queueB2.getPreemptionDisabled());
-
- // Enable preemption for root (grandparent) but disable for root.b (parent).
- // root.b.b2 should inherit property from parent and NOT be preemptable
- conf.setPreemptionDisabled(rootQueue.getQueuePath(), false);
- conf.setPreemptionDisabled(queueB.getQueuePath(), true);
- cs.reinitialize(conf, rmContext);
- assertTrue(
- "queue " + B2 + " should have inherited non-preemptability from parent",
- queueB2.getPreemptionDisabled());
-
- // When preemption is turned on for root.b.b2, it should be preemptable
- // even though preemption is disabled on root.b (parent).
- conf.setPreemptionDisabled(queueB2.getQueuePath(), false);
- cs.reinitialize(conf, rmContext);
- assertFalse("queue " + B2 + " should have been preemptable",
- queueB2.getPreemptionDisabled());
- }
-
- @Test
- public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
- // queue refresh should not allow changing the maximum allocation setting
- // per queue to be smaller than previous setting
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, mockContext);
- checkQueueStructureCapacities(cs);
-
- assertEquals("max allocation in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max allocation for A1",
- Resources.none(),
- conf.getQueueMaximumAllocation(A1));
- assertEquals("max allocation",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
-
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueA = findQueue(rootQueue, A);
- CSQueue queueA1 = findQueue(queueA, A1);
- assertEquals("queue max allocation", ((LeafQueue) queueA1)
- .getMaximumAllocation().getMemorySize(), 8192);
-
- setMaxAllocMb(conf, A1, 4096);
-
- try {
- cs.reinitialize(conf, mockContext);
- fail("should have thrown exception");
- } catch (IOException e) {
- assertTrue("max allocation exception",
- e.getCause().toString().contains("not be decreased"));
- }
-
- setMaxAllocMb(conf, A1, 8192);
- cs.reinitialize(conf, mockContext);
-
- setMaxAllocVcores(conf, A1,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
- try {
- cs.reinitialize(conf, mockContext);
- fail("should have thrown exception");
- } catch (IOException e) {
- assertTrue("max allocation exception",
- e.getCause().toString().contains("not be decreased"));
- }
- }
-
- @Test
- public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
- // verify we can't set the allocation per queue larger then cluster setting
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- cs.init(conf);
- cs.start();
- // change max allocation for B3 queue to be larger then cluster max
- setMaxAllocMb(conf, B3,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
- try {
- cs.reinitialize(conf, mockContext);
- fail("should have thrown exception");
- } catch (IOException e) {
- assertTrue("maximum allocation exception",
- e.getCause().getMessage().contains("maximum allocation"));
- }
-
- setMaxAllocMb(conf, B3,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- cs.reinitialize(conf, mockContext);
-
- setMaxAllocVcores(conf, B3,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
- try {
- cs.reinitialize(conf, mockContext);
- fail("should have thrown exception");
- } catch (IOException e) {
- assertTrue("maximum allocation exception",
- e.getCause().getMessage().contains("maximum allocation"));
- }
- }
-
- @Test
- public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
- // queue refresh should allow max allocation per queue to go larger
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- setMaxAllocMb(conf,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- setMaxAllocVcores(conf,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
- setMaxAllocMb(conf, A1, 4096);
- setMaxAllocVcores(conf, A1, 2);
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, mockContext);
- checkQueueStructureCapacities(cs);
-
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueA = findQueue(rootQueue, A);
- CSQueue queueA1 = findQueue(queueA, A1);
-
- assertEquals("max capability MB in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max capability vcores in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- cs.getMaximumResourceCapability().getVirtualCores());
- assertEquals("max allocation MB A1",
- 4096,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores A1",
- 2,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("cluster max allocation MB",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
- assertEquals("cluster max allocation vcores",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
-
- assertEquals("queue max allocation", 4096,
- queueA1.getMaximumAllocation().getMemorySize());
-
- setMaxAllocMb(conf, A1, 6144);
- setMaxAllocVcores(conf, A1, 3);
- cs.reinitialize(conf, null);
- // conf will have changed but we shouldn't be able to change max allocation
- // for the actual queue
- assertEquals("max allocation MB A1", 6144,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores A1", 3,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("max allocation MB cluster",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
- assertEquals("max allocation vcores cluster",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
- assertEquals("queue max allocation MB", 6144,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("queue max allocation vcores", 3,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("max capability MB cluster",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("cluster max capability vcores",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- cs.getMaximumResourceCapability().getVirtualCores());
- }
-
- @Test
- public void testRefreshQueuesMaxAllocationCSError() throws Exception {
- // Try to refresh the cluster level max allocation size to be smaller
- // and it should error out
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- setMaxAllocMb(conf, 10240);
- setMaxAllocVcores(conf, 10);
- setMaxAllocMb(conf, A1, 4096);
- setMaxAllocVcores(conf, A1, 4);
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, mockContext);
- checkQueueStructureCapacities(cs);
-
- assertEquals("max allocation MB in CS", 10240,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max allocation vcores in CS", 10,
- cs.getMaximumResourceCapability().getVirtualCores());
-
- setMaxAllocMb(conf, 6144);
- try {
- cs.reinitialize(conf, mockContext);
- fail("should have thrown exception");
- } catch (IOException e) {
- assertTrue("max allocation exception",
- e.getCause().toString().contains("not be decreased"));
- }
-
- setMaxAllocMb(conf, 10240);
- cs.reinitialize(conf, mockContext);
-
- setMaxAllocVcores(conf, 8);
- try {
- cs.reinitialize(conf, mockContext);
- fail("should have thrown exception");
- } catch (IOException e) {
- assertTrue("max allocation exception",
- e.getCause().toString().contains("not be decreased"));
- }
- }
-
- @Test
- public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
- // Try to refresh the cluster level max allocation size to be larger
- // and verify that if there is no setting per queue it uses the
- // cluster level setting.
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- setMaxAllocMb(conf, 10240);
- setMaxAllocVcores(conf, 10);
- setMaxAllocMb(conf, A1, 4096);
- setMaxAllocVcores(conf, A1, 4);
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, mockContext);
- checkQueueStructureCapacities(cs);
-
- assertEquals("max allocation MB in CS", 10240,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max allocation vcores in CS", 10,
- cs.getMaximumResourceCapability().getVirtualCores());
-
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueA = findQueue(rootQueue, A);
- CSQueue queueB = findQueue(rootQueue, B);
- CSQueue queueA1 = findQueue(queueA, A1);
- CSQueue queueA2 = findQueue(queueA, A2);
- CSQueue queueB2 = findQueue(queueB, B2);
-
- assertEquals("queue A1 max allocation MB", 4096,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("queue A1 max allocation vcores", 4,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("queue A2 max allocation MB", 10240,
- queueA2.getMaximumAllocation().getMemorySize());
- assertEquals("queue A2 max allocation vcores", 10,
- queueA2.getMaximumAllocation().getVirtualCores());
- assertEquals("queue B2 max allocation MB", 10240,
- queueB2.getMaximumAllocation().getMemorySize());
- assertEquals("queue B2 max allocation vcores", 10,
- queueB2.getMaximumAllocation().getVirtualCores());
-
- setMaxAllocMb(conf, 12288);
- setMaxAllocVcores(conf, 12);
- cs.reinitialize(conf, null);
- // cluster level setting should change and any queues without
- // per queue setting
- assertEquals("max allocation MB in CS", 12288,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max allocation vcores in CS", 12,
- cs.getMaximumResourceCapability().getVirtualCores());
- assertEquals("queue A1 max MB allocation", 4096,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("queue A1 max vcores allocation", 4,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("queue A2 max MB allocation", 12288,
- queueA2.getMaximumAllocation().getMemorySize());
- assertEquals("queue A2 max vcores allocation", 12,
- queueA2.getMaximumAllocation().getVirtualCores());
- assertEquals("queue B2 max MB allocation", 12288,
- queueB2.getMaximumAllocation().getMemorySize());
- assertEquals("queue B2 max vcores allocation", 12,
- queueB2.getMaximumAllocation().getVirtualCores());
- }
-
- @Test
- public void testQueuesMaxAllocationInheritance() throws Exception {
- // queue level max allocation is set by the queue configuration explicitly
- // or inherits from the parent.
-
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- setMaxAllocMb(conf,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- setMaxAllocVcores(conf,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-
- // Test the child queue overrides
- setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
- "memory-mb=4096,vcores=2");
- setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
- setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
- setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");
-
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, mockContext);
- checkQueueStructureCapacities(cs);
-
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueA = findQueue(rootQueue, A);
- CSQueue queueB = findQueue(rootQueue, B);
- CSQueue queueA1 = findQueue(queueA, A1);
- CSQueue queueA2 = findQueue(queueA, A2);
- CSQueue queueB1 = findQueue(queueB, B1);
- CSQueue queueB2 = findQueue(queueB, B2);
-
- assertEquals("max capability MB in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max capability vcores in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- cs.getMaximumResourceCapability().getVirtualCores());
- assertEquals("max allocation MB A1",
- 6144,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores A1",
- 2,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("max allocation MB A2", 4096,
- queueA2.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores A2",
- 2,
- queueA2.getMaximumAllocation().getVirtualCores());
- assertEquals("max allocation MB B", 5120,
- queueB.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation MB B1", 5120,
- queueB1.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation MB B2", 1024,
- queueB2.getMaximumAllocation().getMemorySize());
-
- // Test get the max-allocation from different parent
- unsetMaxAllocation(conf, A1);
- unsetMaxAllocation(conf, B);
- unsetMaxAllocation(conf, B1);
- setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
- "memory-mb=6144,vcores=2");
- setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");
-
- cs.reinitialize(conf, mockContext);
-
- assertEquals("max capability MB in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max capability vcores in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- cs.getMaximumResourceCapability().getVirtualCores());
- assertEquals("max allocation MB A1",
- 8192,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores A1",
- 2,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("max allocation MB B1",
- 6144,
- queueB1.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores B1",
- 2,
- queueB1.getMaximumAllocation().getVirtualCores());
-
- // Test the default
- unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT);
- unsetMaxAllocation(conf, A);
- unsetMaxAllocation(conf, A1);
- cs.reinitialize(conf, mockContext);
-
- assertEquals("max capability MB in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- cs.getMaximumResourceCapability().getMemorySize());
- assertEquals("max capability vcores in CS",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- cs.getMaximumResourceCapability().getVirtualCores());
- assertEquals("max allocation MB A1",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- queueA1.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores A1",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- queueA1.getMaximumAllocation().getVirtualCores());
- assertEquals("max allocation MB A2",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- queueA2.getMaximumAllocation().getMemorySize());
- assertEquals("max allocation vcores A2",
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- queueA2.getMaximumAllocation().getVirtualCores());
- }
-
- @Test
- public void testVerifyQueuesMaxAllocationConf() throws Exception {
- // queue level max allocation can't exceed the cluster setting
-
- CapacityScheduler cs = new CapacityScheduler();
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- setMaxAllocMb(conf,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
- setMaxAllocVcores(conf,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-
- long largerMem =
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024;
- long largerVcores =
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES+10;
-
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, mockContext);
- checkQueueStructureCapacities(cs);
-
- setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
- "memory-mb=" + largerMem + ",vcores=2");
- try {
- cs.reinitialize(conf, mockContext);
- fail("Queue Root maximum allocation can't exceed the cluster setting");
- } catch(Exception e) {
- assertTrue("maximum allocation exception",
- e.getCause().getMessage().contains("maximum allocation"));
- }
+ // way down so that root.b.b2 should NOT be preemptable.
+ conf.setPreemptionDisabled(rootQueue.getQueuePath(), true);
+ cs.reinitialize(conf, rmContext);
+ assertTrue(
+ "queue " + B2 + " should have inherited non-preemptability from root",
+ queueB2.getPreemptionDisabled());
- setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
- "memory-mb=4096,vcores=2");
- setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
- setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
- try {
- cs.reinitialize(conf, mockContext);
- fail("Queue A1 maximum allocation can't exceed the cluster setting");
- } catch(Exception e) {
- assertTrue("maximum allocation exception",
- e.getCause().getMessage().contains("maximum allocation"));
- }
- setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
- try {
- cs.reinitialize(conf, mockContext);
- fail("Queue A1 maximum allocation can't exceed the cluster setting");
- } catch(Exception e) {
- assertTrue("maximum allocation exception",
- e.getCause().getMessage().contains("maximum allocation"));
- }
+ // Enable preemption for root (grandparent) but disable for root.b (parent).
+ // root.b.b2 should inherit property from parent and NOT be preemptable
+ conf.setPreemptionDisabled(rootQueue.getQueuePath(), false);
+ conf.setPreemptionDisabled(queueB.getQueuePath(), true);
+ cs.reinitialize(conf, rmContext);
+ assertTrue(
+ "queue " + B2 + " should have inherited non-preemptability from parent",
+ queueB2.getPreemptionDisabled());
+ // When preemption is turned on for root.b.b2, it should be preemptable
+ // even though preemption is disabled on root.b (parent).
+ conf.setPreemptionDisabled(queueB2.getQueuePath(), false);
+ cs.reinitialize(conf, rmContext);
+ assertFalse("queue " + B2 + " should have been preemptable",
+ queueB2.getPreemptionDisabled());
}
private void waitContainerAllocated(MockAM am, int mem, int nContainer,
@@ -3995,35 +1921,6 @@ public class TestCapacityScheduler {
assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores());
}
- @Test
- public void testDefaultNodeLabelExpressionQueueConfig() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- conf.setDefaultNodeLabelExpression("root.a", " x");
- conf.setDefaultNodeLabelExpression("root.b", " y ");
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
-
- QueueInfo queueInfoA = cs.getQueueInfo("a", true, false);
- Assert.assertEquals("Queue Name should be a", "a",
- queueInfoA.getQueueName());
- Assert.assertEquals("Queue Path should be root.a", "root.a",
- queueInfoA.getQueuePath());
- Assert.assertEquals("Default Node Label Expression should be x", "x",
- queueInfoA.getDefaultNodeLabelExpression());
-
- QueueInfo queueInfoB = cs.getQueueInfo("b", true, false);
- Assert.assertEquals("Queue Name should be b", "b",
- queueInfoB.getQueueName());
- Assert.assertEquals("Queue Path should be root.b", "root.b",
- queueInfoB.getQueuePath());
- Assert.assertEquals("Default Node Label Expression should be y", "y",
- queueInfoB.getDefaultNodeLabelExpression());
- }
-
@Test(timeout = 60000)
public void testAMLimitUsage() throws Exception {
@@ -4216,44 +2113,6 @@ public class TestCapacityScheduler {
rm.stop();
}
- private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- maxAllocMb);
- }
-
- private void setMaxAllocMb(CapacitySchedulerConfiguration conf,
- String queueName, int maxAllocMb) {
- String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
- + MAXIMUM_ALLOCATION_MB;
- conf.setInt(propName, maxAllocMb);
- }
-
- private void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
- maxAllocVcores);
- }
-
- private void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
- String queueName, int maxAllocVcores) {
- String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
- + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
- conf.setInt(propName, maxAllocVcores);
- }
-
- private void setMaxAllocation(CapacitySchedulerConfiguration conf,
- String queueName, String maxAllocation) {
- String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
- + MAXIMUM_ALLOCATION;
- conf.set(propName, maxAllocation);
- }
-
- private void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
- String queueName) {
- String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
- + MAXIMUM_ALLOCATION;
- conf.unset(propName);
- }
-
private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMContainer rmContainer = cs.getRMContainer(containerId);
@@ -4264,171 +2123,6 @@ public class TestCapacityScheduler {
Assert.fail("Cannot find RMContainer");
}
}
- @Test
- public void testRemovedNodeDecomissioningNode() throws Exception {
- NodeStatus mockNodeStatus = createMockNodeStatus();
-
- // Register nodemanager
- NodeManager nm = registerNode("host_decom", 1234, 2345,
- NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
- mockNodeStatus);
-
- RMNode node =
- resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
- // Send a heartbeat to kick the tires on the Scheduler
- NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
- resourceManager.getResourceScheduler().handle(nodeUpdate);
-
- // force remove the node to simulate race condition
- ((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
- removeNode(nm.getNodeId());
- // Kick off another heartbeat with the node state mocked to decommissioning
- RMNode spyNode =
- Mockito.spy(resourceManager.getRMContext().getRMNodes()
- .get(nm.getNodeId()));
- when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
- resourceManager.getResourceScheduler().handle(
- new NodeUpdateSchedulerEvent(spyNode));
- }
-
- @Test
- public void testResourceUpdateDecommissioningNode() throws Exception {
- // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
- // to have 0 available resource
- RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
- Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
- when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler<Event>() {
- @Override
- public void handle(Event event) {
- if (event instanceof RMNodeResourceUpdateEvent) {
- RMNodeResourceUpdateEvent resourceEvent =
- (RMNodeResourceUpdateEvent) event;
- resourceManager
- .getResourceScheduler()
- .getSchedulerNode(resourceEvent.getNodeId())
- .updateTotalResource(resourceEvent.getResourceOption().getResource());
- }
- }
- });
- Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
- ((CapacityScheduler) resourceManager.getResourceScheduler())
- .setRMContext(spyContext);
- ((AsyncDispatcher) mockDispatcher).start();
-
- NodeStatus mockNodeStatus = createMockNodeStatus();
-
- // Register node
- String host_0 = "host_0";
- NodeManager nm_0 = registerNode(host_0, 1234, 2345,
- NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
- mockNodeStatus);
- // ResourceRequest priorities
- Priority priority_0 = Priority.newInstance(0);
-
- // Submit an application
- Application application_0 =
- new Application("user_0", "a1", resourceManager);
- application_0.submit();
-
- application_0.addNodeManager(host_0, 1234, nm_0);
-
- Resource capability_0_0 = Resources.createResource(1 * GB, 1);
- application_0.addResourceRequestSpec(priority_0, capability_0_0);
-
- Task task_0_0 =
- new Task(application_0, priority_0, new String[] { host_0 });
- application_0.addTask(task_0_0);
-
- // Send resource requests to the scheduler
- application_0.schedule();
-
- nodeUpdate(nm_0);
- // Kick off another heartbeat with the node state mocked to decommissioning
- // This should update the schedulernodes to have 0 available resource
- RMNode spyNode =
- Mockito.spy(resourceManager.getRMContext().getRMNodes()
- .get(nm_0.getNodeId()));
- when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
- resourceManager.getResourceScheduler().handle(
- new NodeUpdateSchedulerEvent(spyNode));
-
- // Get allocations from the scheduler
- application_0.schedule();
-
- // Check the used resource is 1 GB 1 core
- Assert.assertEquals(1 * GB, nm_0.getUsed().getMemorySize());
- Resource usedResource =
- resourceManager.getResourceScheduler()
- .getSchedulerNode(nm_0.getNodeId()).getAllocatedResource();
- Assert.assertEquals("Used Resource Memory Size should be 1GB", 1 * GB,
- usedResource.getMemorySize());
- Assert.assertEquals("Used Resource Virtual Cores should be 1", 1,
- usedResource.getVirtualCores());
- // Check total resource of scheduler node is also changed to 1 GB 1 core
- Resource totalResource =
- resourceManager.getResourceScheduler()
- .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
- Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
- totalResource.getMemorySize());
- Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,
- totalResource.getVirtualCores());
- // Check the available resource is 0/0
- Resource availableResource =
- resourceManager.getResourceScheduler()
- .getSchedulerNode(nm_0.getNodeId()).getUnallocatedResource();
- Assert.assertEquals("Available Resource Memory Size should be 0", 0,
- availableResource.getMemorySize());
- Assert.assertEquals("Available Resource Memory Size should be 0", 0,
- availableResource.getVirtualCores());
- // Kick off another heartbeat where the RMNodeResourceUpdateEvent would
- // be skipped for DECOMMISSIONING state since the total resource is
- // already equal to used resource from the previous heartbeat.
- when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
- resourceManager.getResourceScheduler().handle(
- new NodeUpdateSchedulerEvent(spyNode));
- verify(mockDispatcher, times(4)).getEventHandler();
- }
-
- @Test
- public void testSchedulingOnRemovedNode() throws Exception {
- Configuration conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- conf.setBoolean(
- CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
- false);
-
- MockRM rm = new MockRM(conf);
- rm.start();
- RMApp app = MockRMAppSubmitter.submitWithMemory(100, rm);
- rm.drainEvents();
-
- MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
- MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
-
- //remove nm2 to keep am alive
- MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
-
- am.allocate(ResourceRequest.ANY, 2048, 1, null);
-
- CapacityScheduler scheduler =
- (CapacityScheduler) rm.getRMContext().getScheduler();
- FiCaSchedulerNode node =
- (FiCaSchedulerNode)
- scheduler.getNodeTracker().getNode(nm2.getNodeId());
- scheduler.handle(new NodeRemovedSchedulerEvent(
- rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
- // schedulerNode is removed, try allocate a container
- scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
- true);
-
- AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
- new AppAttemptRemovedSchedulerEvent(
- am.getApplicationAttemptId(),
- RMAppAttemptState.FINISHED, false);
- scheduler.handle(appRemovedEvent1);
- rm.stop();
- }
@Test
public void testCSReservationWithRootUnblocked() throws Exception {
@@ -4630,37 +2324,6 @@ public class TestCapacityScheduler {
rm.stop();
}
- private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
- int clusterTs, int appId, String queue,
- String user) {
- ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
- ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
- appId1, appId);
-
- RMAppAttemptMetrics attemptMetric1 =
- new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
- RMAppImpl app1 = mock(RMAppImpl.class);
- when(app1.getApplicationId()).thenReturn(appId1);
- RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
- Container container = mock(Container.class);
- when(attempt1.getMasterContainer()).thenReturn(container);
- ApplicationSubmissionContext submissionContext = mock(
- ApplicationSubmissionContext.class);
- when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
- when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
- when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
- when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
- rm.getRMContext().getRMApps().put(appId1, app1);
-
- SchedulerEvent addAppEvent1 =
- new AppAddedSchedulerEvent(appId1, queue, user);
- cs.handle(addAppEvent1);
- SchedulerEvent addAttemptEvent1 =
- new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
- cs.handle(addAttemptEvent1);
- return appAttemptId1;
- }
-
@Test
public void testAppAttemptLocalityStatistics() throws Exception {
Configuration conf =
@@ -4722,256 +2385,6 @@ public class TestCapacityScheduler {
attemptMetrics.getLocalityStatistics());
}
- /**
- * Test for queue deletion.
- * @throws Exception
- */
- @Test
- public void testRefreshQueuesWithQueueDelete() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
- setupQueueConfiguration(conf);
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, rmContext);
- checkQueueStructureCapacities(cs);
-
- // test delete leaf queue when there is application running.
- Map<String, CSQueue> queues =
- cs.getCapacitySchedulerQueueManager().getShortNameQueues();
- String b1QTobeDeleted = "b1";
- LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted));
- when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
- .thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue(b1QTobeDeleted, csB1Queue);
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfigurationWithoutB1(conf);
- try {
- cs.reinitialize(conf, mockContext);
- fail("Expected to throw exception when refresh queue tries to delete a"
- + " queue with running apps");
- } catch (IOException e) {
- // ignore
- }
-
- // test delete leaf queue(root.b.b1) when there is no application running.
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfigurationWithoutB1(conf);
- try {
- cs.reinitialize(conf, mockContext);
- } catch (IOException e) {
- LOG.error(
- "Expected to NOT throw exception when refresh queue tries to delete"
- + " a queue WITHOUT running apps",
- e);
- fail("Expected to NOT throw exception when refresh queue tries to delete"
- + " a queue WITHOUT running apps");
- }
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueB = findQueue(rootQueue, B);
- CSQueue queueB3 = findQueue(queueB, B1);
- assertNull("Refresh needs to support delete of leaf queue ", queueB3);
-
- // reset back to default configuration for testing parent queue delete
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- cs.reinitialize(conf, rmContext);
- checkQueueStructureCapacities(cs);
-
- // set the configurations such that it fails once but should be successfull
- // next time
- queues = cs.getCapacitySchedulerQueueManager().getShortNameQueues();
- CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b"));
- when(bQueue.getState()).thenReturn(QueueState.DRAINING)
- .thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue("b", bQueue);
-
- bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
- when(bQueue.getState()).thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
-
- bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
- when(bQueue.getState()).thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
-
- bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
- when(bQueue.getState()).thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
-
- // test delete Parent queue when there is application running.
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfigurationWithoutB(conf);
- try {
- cs.reinitialize(conf, mockContext);
- fail("Expected to throw exception when refresh queue tries to delete a"
- + " parent queue with running apps in children queue");
- } catch (IOException e) {
- // ignore
- }
-
- // test delete Parent queue when there is no application running.
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfigurationWithoutB(conf);
- try {
- cs.reinitialize(conf, mockContext);
- } catch (IOException e) {
- fail("Expected to not throw exception when refresh queue tries to delete"
- + " a queue without running apps");
- }
- rootQueue = cs.getRootQueue();
- queueB = findQueue(rootQueue, B);
- String message =
- "Refresh needs to support delete of Parent queue and its children.";
- assertNull(message, queueB);
- assertNull(message,
- cs.getCapacitySchedulerQueueManager().getQueues().get("b"));
- assertNull(message,
- cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
- assertNull(message,
- cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
-
- cs.stop();
- }
-
- /**
- * Test for all child queue deletion and thus making parent queue a child.
- * @throws Exception
- */
- @Test
- public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
- setupQueueConfiguration(conf);
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, rmContext);
- checkQueueStructureCapacities(cs);
-
- // test delete all leaf queues when there is no application running.
- Map<String, CSQueue> queues =
- cs.getCapacitySchedulerQueueManager().getShortNameQueues();
-
- CSQueue bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
- when(bQueue.getState()).thenReturn(QueueState.RUNNING)
- .thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
-
- bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
- when(bQueue.getState()).thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
-
- bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
- when(bQueue.getState()).thenReturn(QueueState.STOPPED);
- cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
-
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfWithoutChildrenOfB(conf);
-
- // test convert parent queue to leaf queue(root.b) when there is no
- // application running.
- try {
- cs.reinitialize(conf, mockContext);
- fail("Expected to throw exception when refresh queue tries to make parent"
- + " queue a child queue when one of its children is still running.");
- } catch (IOException e) {
- //do not do anything, expected exception
- }
-
- // test delete leaf queues(root.b.b1,b2,b3) when there is no application
- // running.
- try {
- cs.reinitialize(conf, mockContext);
- } catch (IOException e) {
- e.printStackTrace();
- fail("Expected to NOT throw exception when refresh queue tries to delete"
- + " all children of a parent queue(without running apps).");
- }
- CSQueue rootQueue = cs.getRootQueue();
- CSQueue queueB = findQueue(rootQueue, B);
- assertNotNull("Parent Queue B should not be deleted", queueB);
- Assert.assertTrue("As Queue'B children are not deleted",
- queueB instanceof LeafQueue);
-
- String message =
- "Refresh needs to support delete of all children of Parent queue.";
- assertNull(message,
- cs.getCapacitySchedulerQueueManager().getQueues().get("b3"));
- assertNull(message,
- cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
- assertNull(message,
- cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
-
- cs.stop();
- }
-
- /**
- * Test if we can convert a leaf queue to a parent queue
- * @throws Exception
- */
- @Test (timeout = 10000)
- public void testConvertLeafQueueToParentQueue() throws Exception {
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
- setupQueueConfiguration(conf);
- cs.setConf(new YarnConfiguration());
- cs.setRMContext(resourceManager.getRMContext());
- cs.init(conf);
- cs.start();
- cs.reinitialize(conf, rmContext);
- checkQueueStructureCapacities(cs);
-
- String targetQueue = "b1";
- CSQueue b1 = cs.getQueue(targetQueue);
- Assert.assertEquals(QueueState.RUNNING, b1.getState());
-
- // test if we can convert a leaf queue which is in RUNNING state
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfigurationWithB1AsParentQueue(conf);
- try {
- cs.reinitialize(conf, mockContext);
- fail("Expected to throw exception when refresh queue tries to convert"
- + " a child queue to a parent queue.");
- } catch (IOException e) {
- // ignore
- }
-
- // now set queue state for b1 to STOPPED
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
- cs.reinitialize(conf, mockContext);
- Assert.assertEquals(QueueState.STOPPED, b1.getState());
-
- // test if we can convert a leaf queue which is in STOPPED state
- conf = new CapacitySchedulerConfiguration();
- setupQueueConfigurationWithB1AsParentQueue(conf);
- try {
- cs.reinitialize(conf, mockContext);
- } catch (IOException e) {
- fail("Expected to NOT throw exception when refresh queue tries"
- + " to convert a leaf queue WITHOUT running apps");
- }
- b1 = cs.getQueue(targetQueue);
- Assert.assertTrue(b1 instanceof ParentQueue);
- Assert.assertEquals(QueueState.RUNNING, b1.getState());
- Assert.assertTrue(!b1.getChildQueues().isEmpty());
- }
@Test(timeout = 30000)
public void testAMLimitDouble() throws Exception {
@@ -5251,166 +2664,6 @@ public class TestCapacityScheduler {
}
@Test
- public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
-
- YarnConfiguration conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
-
- CapacitySchedulerConfiguration newConf =
- new CapacitySchedulerConfiguration(conf);
-
- // Define top-level queues
- newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
- new String[] { "a", "b" });
-
- newConf.setCapacity(A, 50);
- newConf.setCapacity(B, 50);
-
- // Define 2nd-level queues
- newConf.setQueues(A, new String[] { "a1" });
- newConf.setCapacity(A1, 100);
- newConf.setUserLimitFactor(A1, 2.0f);
- newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
-
- newConf.setQueues(B, new String[] { "b1" });
- newConf.setCapacity(B1, 100);
- newConf.setUserLimitFactor(B1, 2.0f);
-
- LOG.info("Setup top-level queues a and b");
-
- MockRM rm = new MockRM(newConf);
- rm.start();
-
- CapacityScheduler scheduler =
- (CapacityScheduler) rm.getResourceScheduler();
-
- MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
-
- // submit an app
- MockRMAppSubmissionData data3 =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
- .withAppName("test-move-1")
- .withUser("u1")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app = MockRMAppSubmitter.submit(rm, data3);
- MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
-
- ApplicationAttemptId appAttemptId =
- rm.getApplicationReport(app.getApplicationId())
- .getCurrentApplicationAttemptId();
-
- MockRMAppSubmissionData data2 =
- MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
- .withAppName("app")
- .withUser("u2")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
-
- MockRMAppSubmissionData data1 =
- MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
- .withAppName("app")
- .withUser("u3")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app3 = MockRMAppSubmitter.submit(rm, data1);
-
- MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
- .withAppName("app")
- .withUser("u4")
- .withAcls(null)
- .withQueue("a1")
- .withUnmanagedAM(false)
- .build();
- RMApp app4 = MockRMAppSubmitter.submit(rm, data);
-
- // Each application asks 50 * 1GB containers
- am1.allocate("*", 1 * GB, 50, null);
- am2.allocate("*", 1 * GB, 50, null);
-
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
- RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // check preconditions
- List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(4, appsInA1.size());
- String queue =
- scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("a1", queue);
-
- List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
- assertTrue(appsInA.contains(appAttemptId));
- assertEquals(4, appsInA.size());
-
- List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(4, appsInRoot.size());
-
- List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
- assertTrue(appsInB1.isEmpty());
-
- List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.isEmpty());
-
- UsersManager um =
- (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
-
- assertEquals(4, um.getNumActiveUsers());
- assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
-
- // now move the app
- scheduler.moveAllApps("a1", "b1");
-
- //Triggering this event so that user limit computation can
- //happen again
- for (int i = 0; i < 10; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- Thread.sleep(500);
- }
-
- // check postconditions
- appsInB1 = scheduler.getAppsInQueue("b1");
-
- assertEquals(4, appsInB1.size());
- queue =
- scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
- .getQueueName();
- Assert.assertEquals("b1", queue);
-
- appsInB = scheduler.getAppsInQueue("b");
- assertTrue(appsInB.contains(appAttemptId));
- assertEquals(4, appsInB.size());
-
- appsInRoot = scheduler.getAppsInQueue("root");
- assertTrue(appsInRoot.contains(appAttemptId));
- assertEquals(4, appsInRoot.size());
-
- List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1");
- assertEquals(0, oldAppsInA1.size());
-
- UsersManager um_b1 =
- (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
-
- assertEquals(2, um_b1.getNumActiveUsers());
- assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps());
-
- appsInB1 = scheduler.getAppsInQueue("b1");
- assertEquals(4, appsInB1.size());
- rm.close();
- }
-
- @Test
public void testCSQueueMetrics() throws Exception {
// Initialize resource map
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
new file mode 100644
index 0000000..9943e03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
@@ -0,0 +1,1499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkApplicationResourceUsage;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkNodeResourceUsage;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerApps {
+
+ private ResourceManager resourceManager = null;
+ private RMContext mockContext;
+
+ @Before
+ public void setUp() throws Exception {
+ resourceManager = createResourceManager();
+ mockContext = createMockRMContext();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stopResourceManager(resourceManager);
+ }
+
+ @Test
+ public void testGetAppsInQueue() throws Exception {
+ Application application0 = new Application("user_0", "a1", resourceManager);
+ application0.submit();
+
+ Application application1 = new Application("user_0", "a2", resourceManager);
+ application1.submit();
+
+ Application application2 = new Application("user_0", "b2", resourceManager);
+ application2.submit();
+
+ ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(application0.getApplicationAttemptId()));
+ assertTrue(appsInA.contains(application1.getApplicationAttemptId()));
+ assertEquals(2, appsInA.size());
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(application0.getApplicationAttemptId()));
+ assertTrue(appsInRoot.contains(application1.getApplicationAttemptId()));
+ assertTrue(appsInRoot.contains(application2.getApplicationAttemptId()));
+ assertEquals(3, appsInRoot.size());
+
+ Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
+ }
+
+ @Test
+ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ @SuppressWarnings("unchecked")
+ AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
+ (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
+ .getResourceScheduler();
+ SchedulerApplication<SchedulerApplicationAttempt> app =
+ TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
+ cs.getSchedulerApplications(), cs, "a1");
+ Assert.assertEquals("a1", app.getQueue().getQueueName());
+ }
+
+ @Test
+ public void testKillAllAppsInQueue() throws Exception {
+ MockRM rm = setUpMove();
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+
+ // submit an app
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(1, appsInA.size());
+ String queue =
+ scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+ .getQueueName();
+ Assert.assertEquals("a1", queue);
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ // now kill the app
+ scheduler.killAllAppsInQueue("a1");
+
+ // check postconditions
+ rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+ rm.waitForAppRemovedFromScheduler(app.getApplicationId());
+ appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.isEmpty());
+
+ appsInA1 = scheduler.getAppsInQueue("a1");
+ assertTrue(appsInA1.isEmpty());
+
+ appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.isEmpty());
+
+ rm.stop();
+ }
+
+ @Test
+ public void testKillAllAppsInvalidSource() throws Exception {
+ MockRM rm = setUpMove();
+ YarnScheduler scheduler = rm.getResourceScheduler();
+
+ // submit an app
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(1, appsInA.size());
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ // now kill the app
+ try {
+ scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
+ Assert.fail();
+ } catch (YarnException e) {
+ // expected
+ }
+
+ // check postconditions, app should still be in a1
+ appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+
+ appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(1, appsInA.size());
+
+ appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ rm.stop();
+ }
+
+ // Test to ensure that we don't carry out reservation on nodes
+ // that have no CPU available when using the DominantResourceCalculator
+ @Test(timeout = 30000)
+ public void testAppReservationWithDominantResourceCalculator() throws Exception {
+ CapacitySchedulerConfiguration csconf =
+ new CapacitySchedulerConfiguration();
+ csconf.setResourceComparator(DominantResourceCalculator.class);
+
+ YarnConfiguration conf = new YarnConfiguration(csconf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);
+
+ // register extra nodes to bump up cluster resource
+ MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
+ rm.registerNode("127.0.0.1:1236", 10 * GB, 4);
+
+ RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm);
+ // kick the scheduling
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+ SchedulerNodeReport reportNm1 =
+ rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+
+ // check node report
+ Assert.assertEquals(1 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(9 * GB, reportNm1.getAvailableResource().getMemorySize());
+
+ // add request for containers
+ am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 1);
+ am1.schedule(); // send the request
+
+ // kick the scheduler, container reservation should not happen
+ nm1.nodeHeartbeat(true);
+ Thread.sleep(1000);
+ AllocateResponse allocResponse = am1.schedule();
+ ApplicationResourceUsageReport report =
+ rm.getResourceScheduler().getAppResourceUsageReport(
+ attempt1.getAppAttemptId());
+ Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
+ Assert.assertEquals(0, report.getNumReservedContainers());
+
+ // container should get allocated on this node
+ nm2.nodeHeartbeat(true);
+
+ while (allocResponse.getAllocatedContainers().size() == 0) {
+ Thread.sleep(100);
+ allocResponse = am1.schedule();
+ }
+ report =
+ rm.getResourceScheduler().getAppResourceUsageReport(
+ attempt1.getAppAttemptId());
+ Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
+ Assert.assertEquals(0, report.getNumReservedContainers());
+ rm.stop();
+ }
+
+ @Test
+ public void testMoveAppBasic() throws Exception {
+ MockRM rm = setUpMove();
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+ QueueMetrics metrics = scheduler.getRootQueueMetrics();
+ Assert.assertEquals(0, metrics.getAppsPending());
+ // submit an app
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+ // check preconditions
+ List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+ assertEquals(1, appsInA1.size());
+ String queue =
+ scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+ .getQueueName();
+ Assert.assertEquals("a1", queue);
+
+ List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.contains(appAttemptId));
+ assertEquals(1, appsInA.size());
+
+ List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ assertEquals(1, metrics.getAppsPending());
+
+ List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+ assertTrue(appsInB1.isEmpty());
+
+ List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+ assertTrue(appsInB.isEmpty());
+
+ // now move the app
+ scheduler.moveApplication(app.getApplicationId(), "b1");
+
+ // check postconditions
+ appsInB1 = scheduler.getAppsInQueue("b1");
+ assertEquals(1, appsInB1.size());
+ queue =
+ scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+ .getQueueName();
+ Assert.assertEquals("b1", queue);
+
+ appsInB = scheduler.getAppsInQueue("b");
+ assertTrue(appsInB.contains(appAttemptId));
+ assertEquals(1, appsInB.size());
+
+ appsInRoot = scheduler.getAppsInQueue("root");
+ assertTrue(appsInRoot.contains(appAttemptId));
+ assertEquals(1, appsInRoot.size());
+
+ assertEquals(1, metrics.getAppsPending());
+
+ appsInA1 = scheduler.getAppsInQueue("a1");
+ assertTrue(appsInA1.isEmpty());
+
+ appsInA = scheduler.getAppsInQueue("a");
+ assertTrue(appsInA.isEmpty());
+
+ rm.stop();
+ }
+
+ @Test
+ public void testMoveAppPendingMetrics() throws Exception {
+ MockRM rm = setUpMove();
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ assertApps(scheduler, 0, 0, 0);
+
+ // submit two apps in a1
+ RMApp app1 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .build());
+ RMApp app2 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-2")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .build());
+ assertApps(scheduler, 2, 0, 2);
+
+ // submit one app in b1
+ RMApp app3 = MockRMAppSubmitter.submit(rm,
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-2")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("b1")
+ .build());
+ assertApps(scheduler, 2, 1, 3);
+
+ // now move the app1 from a1 to b1
+ scheduler.moveApplication(app1.getApplicationId(), "b1");
+ assertApps(scheduler, 1, 2, 3);
+
+ // now move the app2 from a1 to b1
+ scheduler.moveApplication(app2.getApplicationId(), "b1");
+ assertApps(scheduler, 0, 3, 3);
+
+ // now move the app3 from b1 to a1
+ scheduler.moveApplication(app3.getApplicationId(), "a1");
+ assertApps(scheduler, 1, 2, 3);
+ rm.stop();
+ }
+
+ private void assertApps(ResourceScheduler scheduler,
+ int a1Size,
+ int b1Size,
+ int appsPending) {
+ assertAppsSize(scheduler, "a1", a1Size);
+ assertAppsSize(scheduler, "b1", b1Size);
+ assertEquals(appsPending, scheduler.getRootQueueMetrics().getAppsPending());
+ }
+
+ private void assertAppsSize(ResourceScheduler scheduler, String queueName, int size) {
+ assertEquals(size, scheduler.getAppsInQueue(queueName).size());
+ }
+
+ @Test
+ public void testMoveAppSameParent() throws Exception {
+ MockRM rm = setUpMove();
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+
+ // submit an app
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ assertOneAppInQueue(scheduler, "a1");
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "a", appAttemptId);
+ assertApps(scheduler, "a2");
+
+ // now move the app
+ scheduler.moveApplication(app.getApplicationId(), "a2");
+
+ // check postconditions
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "a", appAttemptId);
+ assertApps(scheduler, "a1");
+ assertOneAppInQueue(scheduler, "a2");
+
+ rm.stop();
+ }
+
+ private void assertApps(ResourceScheduler scheduler,
+ String queueName,
+ ApplicationAttemptId... apps) {
+ assertEquals(Lists.newArrayList(apps), scheduler.getAppsInQueue(queueName));
+ }
+
+ private void assertOneAppInQueue(AbstractYarnScheduler scheduler, String queueName) {
+ List<ApplicationAttemptId> apps = scheduler.getAppsInQueue(queueName);
+ assertEquals(1, apps.size());
+ Assert.assertEquals(queueName,
+ scheduler.getApplicationAttempt(apps.get(0)).getQueue().getQueueName());
+ }
+
+ @Test
+ public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
+
+ ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+ NodeStatus mockNodeStatus = createMockNodeStatus();
+
+ // Register node1
+ String host0 = "host_0";
+ NodeManager nm0 =
+ registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(4 * GB, 1), mockNodeStatus);
+
+ // Register node2
+ String host1 = "host_1";
+ NodeManager nm1 =
+ registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(2 * GB, 1), mockNodeStatus);
+
+ // ResourceRequest priorities
+ Priority priority0 = Priority.newInstance(0);
+ Priority priority1 = Priority.newInstance(1);
+
+ // Submit application_0
+ Application application0 =
+ new Application("user_0", "a1", resourceManager);
+ application0.submit(); // app + app attempt event sent to scheduler
+
+ application0.addNodeManager(host0, 1234, nm0);
+ application0.addNodeManager(host1, 1234, nm1);
+
+ Resource capability00 = Resources.createResource(1 * GB, 1);
+ application0.addResourceRequestSpec(priority1, capability00);
+
+ Resource capability01 = Resources.createResource(2 * GB, 1);
+ application0.addResourceRequestSpec(priority0, capability01);
+
+ Task task00 =
+ new Task(application0, priority1, new String[]{host0, host1});
+ application0.addTask(task00);
+
+ // Submit application_1
+ Application application1 =
+ new Application("user_1", "b2", resourceManager);
+ application1.submit(); // app + app attempt event sent to scheduler
+
+ application1.addNodeManager(host0, 1234, nm0);
+ application1.addNodeManager(host1, 1234, nm1);
+
+ Resource capability10 = Resources.createResource(1 * GB, 1);
+ application1.addResourceRequestSpec(priority1, capability10);
+
+ Resource capability11 = Resources.createResource(2 * GB, 1);
+ application1.addResourceRequestSpec(priority0, capability11);
+
+ Task task10 =
+ new Task(application1, priority1, new String[]{host0, host1});
+ application1.addTask(task10);
+
+ // Send resource requests to the scheduler
+ application0.schedule(); // allocate
+ application1.schedule(); // allocate
+
+ // task_0_0 task_1_0 allocated, used=2G
+ nodeUpdate(resourceManager, nm0);
+
+ // nothing allocated
+ nodeUpdate(resourceManager, nm1);
+
+ // Get allocations from the scheduler
+ application0.schedule(); // task_0_0
+ checkApplicationResourceUsage(1 * GB, application0);
+
+ application1.schedule(); // task_1_0
+ checkApplicationResourceUsage(1 * GB, application1);
+
+ checkNodeResourceUsage(2 * GB, nm0); // task_0_0 (1G) and task_1_0 (1G) 2G
+ // available
+ checkNodeResourceUsage(0 * GB, nm1); // no tasks, 2G available
+
+ // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
+ // total cap)
+ scheduler.moveApplication(application0.getApplicationId(), "b1");
+
+ // 2GB 1C
+ Task task11 =
+ new Task(application1, priority0,
+ new String[]{ResourceRequest.ANY});
+ application1.addTask(task11);
+
+ application1.schedule();
+
+ // 2GB 1C
+ Task task01 =
+ new Task(application0, priority0, new String[]{host0, host1});
+ application0.addTask(task01);
+
+ application0.schedule();
+
+ // prev 2G used free 2G
+ nodeUpdate(resourceManager, nm0);
+
+ // prev 0G used free 2G
+ nodeUpdate(resourceManager, nm1);
+
+ // Get allocations from the scheduler
+ application1.schedule();
+ checkApplicationResourceUsage(3 * GB, application1);
+
+ // Get allocations from the scheduler
+ application0.schedule();
+ checkApplicationResourceUsage(3 * GB, application0);
+
+ checkNodeResourceUsage(4 * GB, nm0);
+ checkNodeResourceUsage(2 * GB, nm1);
+ }
+
+ @Test
+ public void testMoveAppSuccess() throws Exception {
+
+ ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+ NodeStatus mockNodeStatus = createMockNodeStatus();
+
+ // Register node1
+ String host0 = "host_0";
+ NodeManager nm0 =
+ registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+ // Register node2
+ String host1 = "host_1";
+ NodeManager nm1 =
+ registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+ // ResourceRequest priorities
+ Priority priority0 = Priority.newInstance(0);
+ Priority priority1 = Priority.newInstance(1);
+
+ // Submit application_0
+ Application application0 =
+ new Application("user_0", "a1", resourceManager);
+ application0.submit(); // app + app attempt event sent to scheduler
+
+ application0.addNodeManager(host0, 1234, nm0);
+ application0.addNodeManager(host1, 1234, nm1);
+
+ Resource capability00 = Resources.createResource(3 * GB, 1);
+ application0.addResourceRequestSpec(priority1, capability00);
+
+ Resource capability01 = Resources.createResource(2 * GB, 1);
+ application0.addResourceRequestSpec(priority0, capability01);
+
+ Task task00 =
+ new Task(application0, priority1, new String[]{host0, host1});
+ application0.addTask(task00);
+
+ // Submit application_1
+ Application application1 =
+ new Application("user_1", "b2", resourceManager);
+ application1.submit(); // app + app attempt event sent to scheduler
+
+ application1.addNodeManager(host0, 1234, nm0);
+ application1.addNodeManager(host1, 1234, nm1);
+
+ Resource capability10 = Resources.createResource(1 * GB, 1);
+ application1.addResourceRequestSpec(priority1, capability10);
+
+ Resource capability11 = Resources.createResource(2 * GB, 1);
+ application1.addResourceRequestSpec(priority0, capability11);
+
+ Task task10 =
+ new Task(application1, priority1, new String[]{host0, host1});
+ application1.addTask(task10);
+
+ // Send resource requests to the scheduler
+ application0.schedule(); // allocate
+ application1.schedule(); // allocate
+
+ // b2 can only run 1 app at a time
+ scheduler.moveApplication(application0.getApplicationId(), "b2");
+
+ nodeUpdate(resourceManager, nm0);
+
+ nodeUpdate(resourceManager, nm1);
+
+ // Get allocations from the scheduler
+ application0.schedule(); // task_0_0
+ checkApplicationResourceUsage(0 * GB, application0);
+
+ application1.schedule(); // task_1_0
+ checkApplicationResourceUsage(1 * GB, application1);
+
+ // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+ // not scheduled
+ checkNodeResourceUsage(1 * GB, nm0);
+ checkNodeResourceUsage(0 * GB, nm1);
+
+ // lets move application_0 to a queue where it can run
+ scheduler.moveApplication(application0.getApplicationId(), "a2");
+ application0.schedule();
+
+ nodeUpdate(resourceManager, nm1);
+
+ // Get allocations from the scheduler
+ application0.schedule(); // task_0_0
+ checkApplicationResourceUsage(3 * GB, application0);
+
+ checkNodeResourceUsage(1 * GB, nm0);
+ checkNodeResourceUsage(3 * GB, nm1);
+
+ }
+
+ @Test(expected = YarnException.class)
+ public void testMoveAppViolateQueueState() throws Exception {
+ resourceManager = new ResourceManager() {
+ @Override
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(getConfig());
+ return mgr;
+ }
+ };
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ StringBuilder qState = new StringBuilder();
+ qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
+ .append(CapacitySchedulerConfiguration.DOT)
+ .append(CapacitySchedulerConfiguration.STATE);
+ csConf.set(qState.toString(), QueueState.STOPPED.name());
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ resourceManager.init(conf);
+ resourceManager.getRMContext().getContainerTokenSecretManager()
+ .rollMasterKey();
+ resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
+ ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
+ mockContext = mock(RMContext.class);
+ when(mockContext.getConfigurationProvider()).thenReturn(
+ new LocalConfigurationProvider());
+
+ ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+ NodeStatus mockNodeStatus = createMockNodeStatus();
+
+ // Register node1
+ String host0 = "host_0";
+ NodeManager nm0 =
+ registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(6 * GB, 1), mockNodeStatus);
+
+ // ResourceRequest priorities
+ Priority priority0 = Priority.newInstance(0);
+ Priority priority1 = Priority.newInstance(1);
+
+ // Submit application_0
+ Application application0 =
+ new Application("user_0", "a1", resourceManager);
+ application0.submit(); // app + app attempt event sent to scheduler
+
+ application0.addNodeManager(host0, 1234, nm0);
+
+ Resource capability00 = Resources.createResource(3 * GB, 1);
+ application0.addResourceRequestSpec(priority1, capability00);
+
+ Resource capability01 = Resources.createResource(2 * GB, 1);
+ application0.addResourceRequestSpec(priority0, capability01);
+
+ Task task00 =
+ new Task(application0, priority1, new String[]{host0});
+ application0.addTask(task00);
+
+ // Send resource requests to the scheduler
+ application0.schedule(); // allocate
+
+ // task_0_0 allocated
+ nodeUpdate(resourceManager, nm0);
+
+ // Get allocations from the scheduler
+ application0.schedule(); // task_0_0
+ checkApplicationResourceUsage(3 * GB, application0);
+
+ checkNodeResourceUsage(3 * GB, nm0);
+ // b2 queue contains 3GB consumption app,
+ // add another 3GB will hit max capacity limit on queue b
+ scheduler.moveApplication(application0.getApplicationId(), "b1");
+
+ }
+
+ @Test
+ public void testMoveAppQueueMetricsCheck() throws Exception {
+ ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+ NodeStatus mockNodeStatus = createMockNodeStatus();
+
+ // Register node1
+ String host0 = "host_0";
+ NodeManager nm0 =
+ registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+ // Register node2
+ String host1 = "host_1";
+ NodeManager nm1 =
+ registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+ // ResourceRequest priorities
+ Priority priority0 = Priority.newInstance(0);
+ Priority priority1 = Priority.newInstance(1);
+
+ // Submit application_0
+ Application application0 =
+ new Application("user_0", "a1", resourceManager);
+ application0.submit(); // app + app attempt event sent to scheduler
+
+ application0.addNodeManager(host0, 1234, nm0);
+ application0.addNodeManager(host1, 1234, nm1);
+
+ Resource capability00 = Resources.createResource(3 * GB, 1);
+ application0.addResourceRequestSpec(priority1, capability00);
+
+ Resource capability01 = Resources.createResource(2 * GB, 1);
+ application0.addResourceRequestSpec(priority0, capability01);
+
+ Task task00 =
+ new Task(application0, priority1, new String[]{host0, host1});
+ application0.addTask(task00);
+
+ // Submit application_1
+ Application application1 =
+ new Application("user_1", "b2", resourceManager);
+ application1.submit(); // app + app attempt event sent to scheduler
+
+ application1.addNodeManager(host0, 1234, nm0);
+ application1.addNodeManager(host1, 1234, nm1);
+
+ Resource capability10 = Resources.createResource(1 * GB, 1);
+ application1.addResourceRequestSpec(priority1, capability10);
+
+ Resource capability11 = Resources.createResource(2 * GB, 1);
+ application1.addResourceRequestSpec(priority0, capability11);
+
+ Task task10 =
+ new Task(application1, priority1, new String[]{host0, host1});
+ application1.addTask(task10);
+
+ // Send resource requests to the scheduler
+ application0.schedule(); // allocate
+ application1.schedule(); // allocate
+
+ nodeUpdate(resourceManager, nm0);
+
+ nodeUpdate(resourceManager, nm1);
+
+ CapacityScheduler cs =
+ (CapacityScheduler) resourceManager.getResourceScheduler();
+ CSQueue origRootQ = cs.getRootQueue();
+ CapacitySchedulerInfo oldInfo =
+ new CapacitySchedulerInfo(origRootQ, cs);
+ int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
+ int origNumAppsRoot = origRootQ.getNumApplications();
+
+ scheduler.moveApplication(application0.getApplicationId(), "a2");
+
+ CSQueue newRootQ = cs.getRootQueue();
+ int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
+ int newNumAppsRoot = newRootQ.getNumApplications();
+ CapacitySchedulerInfo newInfo =
+ new CapacitySchedulerInfo(newRootQ, cs);
+ CapacitySchedulerLeafQueueInfo origOldA1 =
+ (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
+ CapacitySchedulerLeafQueueInfo origNewA1 =
+ (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
+ CapacitySchedulerLeafQueueInfo targetOldA2 =
+ (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
+ CapacitySchedulerLeafQueueInfo targetNewA2 =
+ (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
+ // originally submitted here
+ assertEquals(1, origOldA1.getNumApplications());
+ assertEquals(1, origNumAppsA);
+ assertEquals(2, origNumAppsRoot);
+ // after the move
+ assertEquals(0, origNewA1.getNumApplications());
+ assertEquals(1, newNumAppsA);
+ assertEquals(2, newNumAppsRoot);
+ // original consumption on a1
+ assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemorySize());
+ assertEquals(1, origOldA1.getResourcesUsed().getvCores());
+ assertEquals(0, origNewA1.getResourcesUsed().getMemorySize()); // after the move
+ assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
+ // app moved here with live containers
+ assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemorySize());
+ assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
+ // it was empty before the move
+ assertEquals(0, targetOldA2.getNumApplications());
+ assertEquals(0, targetOldA2.getResourcesUsed().getMemorySize());
+ assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
+ // after the app moved here
+ assertEquals(1, targetNewA2.getNumApplications());
+ // 1 container on original queue before move
+ assertEquals(1, origOldA1.getNumContainers());
+ // after the move the resource released
+ assertEquals(0, origNewA1.getNumContainers());
+ // and moved to the new queue
+ assertEquals(1, targetNewA2.getNumContainers());
+ // which originally didn't have any
+ assertEquals(0, targetOldA2.getNumContainers());
+ // 1 user with 3GB
+ assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
+ .getResourcesUsed().getMemorySize());
+ // 1 user with 1 core
+ assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
+ .getResourcesUsed().getvCores());
+ // user ha no more running app in the orig queue
+ assertEquals(0, origNewA1.getUsers().getUsersList().size());
+ // 1 user with 3GB
+ assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
+ .getResourcesUsed().getMemorySize());
+ // 1 user with 1 core
+ assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
+ .getResourcesUsed().getvCores());
+
+ // Get allocations from the scheduler
+ application0.schedule(); // task_0_0
+ checkApplicationResourceUsage(3 * GB, application0);
+
+ application1.schedule(); // task_1_0
+ checkApplicationResourceUsage(1 * GB, application1);
+
+ // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+ // not scheduled
+ checkNodeResourceUsage(4 * GB, nm0);
+ checkNodeResourceUsage(0 * GB, nm1);
+
+ }
+
+ @Test
+ public void testMoveAllApps() throws Exception {
+ MockRM rm = setUpMove();
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+
+ // submit an app
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ assertOneAppInQueue(scheduler, "a1");
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "a", appAttemptId);
+ assertApps(scheduler, "a1", appAttemptId);
+ assertApps(scheduler, "b1");
+ assertApps(scheduler, "b");
+
+ // now move the app
+ scheduler.moveAllApps("a1", "b1");
+
+ // check post conditions
+ Thread.sleep(1000);
+ assertOneAppInQueue(scheduler, "b1");
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "b", appAttemptId);
+ assertApps(scheduler, "b1", appAttemptId);
+ assertApps(scheduler, "a1");
+ assertApps(scheduler, "a");
+
+ rm.stop();
+ }
+
+ @Test
+ public void testMoveAllAppsInvalidDestination() throws Exception {
+ MockRM rm = setUpMove();
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+
+ // submit an app
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "a", appAttemptId);
+ assertApps(scheduler, "a1", appAttemptId);
+ assertApps(scheduler, "b");
+ assertApps(scheduler, "b1");
+
+ // now move the app
+ try {
+ scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
+ Assert.fail();
+ } catch (YarnException e) {
+ // expected
+ }
+
+ // check post conditions, app should still be in a1
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "a", appAttemptId);
+ assertApps(scheduler, "a1", appAttemptId);
+ assertApps(scheduler, "b");
+ assertApps(scheduler, "b1");
+
+ rm.stop();
+ }
+
+ @Test
+ public void testMoveAllAppsInvalidSource() throws Exception {
+ MockRM rm = setUpMove();
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+
+ // submit an app
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("user_0")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data);
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ // check preconditions
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "a", appAttemptId);
+ assertApps(scheduler, "a1", appAttemptId);
+ assertApps(scheduler, "b");
+ assertApps(scheduler, "b1");
+
+ // now move the app
+ try {
+ scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
+ Assert.fail();
+ } catch (YarnException e) {
+ // expected
+ }
+
+ // check post conditions, app should still be in a1
+ assertApps(scheduler, "root", appAttemptId);
+ assertApps(scheduler, "a", appAttemptId);
+ assertApps(scheduler, "a1", appAttemptId);
+ assertApps(scheduler, "b");
+ assertApps(scheduler, "b1");
+
+ rm.stop();
+ }
+
+ @Test
+ public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration(conf);
+
+ // Define top-level queues
+ newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[]{"a", "b"});
+
+ newConf.setCapacity(A, 50);
+ newConf.setCapacity(B, 50);
+
+ // Define 2nd-level queues
+ newConf.setQueues(A, new String[]{"a1"});
+ newConf.setCapacity(A1, 100);
+ newConf.setUserLimitFactor(A1, 2.0f);
+ newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
+
+ newConf.setQueues(B, new String[]{"b1"});
+ newConf.setCapacity(B1, 100);
+ newConf.setUserLimitFactor(B1, 2.0f);
+
+ MockRM rm = new MockRM(newConf);
+ rm.start();
+
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm.getResourceScheduler();
+
+ MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
+
+ // submit an app
+ MockRMAppSubmissionData data3 =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withAppName("test-move-1")
+ .withUser("u1")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app = MockRMAppSubmitter.submit(rm, data3);
+ MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+ ApplicationAttemptId appAttemptId =
+ rm.getApplicationReport(app.getApplicationId())
+ .getCurrentApplicationAttemptId();
+
+ MockRMAppSubmissionData data2 =
+ MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
+ .withAppName("app")
+ .withUser("u2")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+ MockRMAppSubmissionData data1 =
+ MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
+ .withAppName("app")
+ .withUser("u3")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app3 = MockRMAppSubmitter.submit(rm, data1);
+
+ MockRMAppSubmissionData data =
+ MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
+ .withAppName("app")
+ .withUser("u4")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app4 = MockRMAppSubmitter.submit(rm, data);
+
+ // Each application asks 50 * 1GB containers
+ am1.allocate("*", 1 * GB, 50, null);
+ am2.allocate("*", 1 * GB, 50, null);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // check preconditions
+ assertApps(scheduler, "root",
+ app3.getCurrentAppAttempt().getAppAttemptId(),
+ app4.getCurrentAppAttempt().getAppAttemptId(),
+ appAttemptId,
+ app2.getCurrentAppAttempt().getAppAttemptId());
+ assertApps(scheduler, "a",
+ app3.getCurrentAppAttempt().getAppAttemptId(),
+ app4.getCurrentAppAttempt().getAppAttemptId(),
+ appAttemptId,
+ app2.getCurrentAppAttempt().getAppAttemptId());
+ assertApps(scheduler, "a1",
+ app3.getCurrentAppAttempt().getAppAttemptId(),
+ app4.getCurrentAppAttempt().getAppAttemptId(),
+ appAttemptId,
+ app2.getCurrentAppAttempt().getAppAttemptId());
+ assertApps(scheduler, "b");
+ assertApps(scheduler, "b1");
+
+ UsersManager um =
+ (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
+
+ assertEquals(4, um.getNumActiveUsers());
+ assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+
+ // now move the app
+ scheduler.moveAllApps("a1", "b1");
+
+ //Triggering this event so that user limit computation can
+ //happen again
+ for (int i = 0; i < 10; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Thread.sleep(500);
+ }
+
+ // check post conditions
+ assertApps(scheduler, "root",
+ appAttemptId,
+ app2.getCurrentAppAttempt().getAppAttemptId(),
+ app3.getCurrentAppAttempt().getAppAttemptId(),
+ app4.getCurrentAppAttempt().getAppAttemptId());
+ assertApps(scheduler, "a");
+ assertApps(scheduler, "a1");
+ assertApps(scheduler, "b",
+ appAttemptId,
+ app2.getCurrentAppAttempt().getAppAttemptId(),
+ app3.getCurrentAppAttempt().getAppAttemptId(),
+ app4.getCurrentAppAttempt().getAppAttemptId());
+ assertApps(scheduler, "b1",
+ appAttemptId,
+ app2.getCurrentAppAttempt().getAppAttemptId(),
+ app3.getCurrentAppAttempt().getAppAttemptId(),
+ app4.getCurrentAppAttempt().getAppAttemptId());
+
+ UsersManager umB1 =
+ (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
+
+ assertEquals(2, umB1.getNumActiveUsers());
+ assertEquals(2, umB1.getNumActiveUsersWithOnlyPendingApps());
+
+ rm.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMoveAttemptNotAdded() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(getCapacityConfiguration(conf));
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+
+ RMAppAttemptMetrics attemptMetric =
+ new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+ RMAppImpl app = mock(RMAppImpl.class);
+ when(app.getApplicationId()).thenReturn(appId);
+ RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+ Container container = mock(Container.class);
+ when(attempt.getMasterContainer()).thenReturn(container);
+ ApplicationSubmissionContext submissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(attempt.getSubmissionContext()).thenReturn(submissionContext);
+ when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
+ when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
+ when(app.getCurrentAppAttempt()).thenReturn(attempt);
+
+ rm.getRMContext().getRMApps().put(appId, app);
+
+ SchedulerEvent addAppEvent =
+ new AppAddedSchedulerEvent(appId, "a1", "user");
+ try {
+ cs.moveApplication(appId, "b1");
+ fail("Move should throw exception app not available");
+ } catch (YarnException e) {
+ assertEquals("App to be moved application_100_0001 not found.",
+ e.getMessage());
+ }
+ cs.handle(addAppEvent);
+ cs.moveApplication(appId, "b1");
+ SchedulerEvent addAttemptEvent =
+ new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+ cs.handle(addAttemptEvent);
+ CSQueue rootQ = cs.getRootQueue();
+ CSQueue queueB = cs.getQueue("b");
+ CSQueue queueA = cs.getQueue("a");
+ CSQueue queueA1 = cs.getQueue("a1");
+ CSQueue queueB1 = cs.getQueue("b1");
+ Assert.assertEquals(1, rootQ.getNumApplications());
+ Assert.assertEquals(0, queueA.getNumApplications());
+ Assert.assertEquals(1, queueB.getNumApplications());
+ Assert.assertEquals(0, queueA1.getNumApplications());
+ Assert.assertEquals(1, queueB1.getNumApplications());
+
+ rm.close();
+ }
+
+ @Test
+ public void testRemoveAttemptMoveAdded() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ CapacityScheduler.class);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ // Create Mock RM
+ MockRM rm = new MockRM(getCapacityConfiguration(conf));
+ CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
+ // add node
+ Resource newResource = Resource.newInstance(4 * GB, 1);
+ RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
+ SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
+ sch.handle(addNode);
+
+ ApplicationAttemptId appAttemptId = appHelper(rm, sch, 100, 1, "a1", "user");
+
+ // get Queues
+ CSQueue queueA1 = sch.getQueue("a1");
+ CSQueue queueB = sch.getQueue("b");
+ CSQueue queueB1 = sch.getQueue("b1");
+
+ // add Running rm container and simulate live containers to a1
+ ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
+ RMContainerImpl rmContainer = mock(RMContainerImpl.class);
+ when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
+ Container container2 = mock(Container.class);
+ when(rmContainer.getContainer()).thenReturn(container2);
+ Resource resource = Resource.newInstance(1024, 1);
+ when(container2.getResource()).thenReturn(resource);
+ when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+ when(container2.getNodeId()).thenReturn(node.getNodeID());
+ when(container2.getId()).thenReturn(newContainerId);
+ when(rmContainer.getNodeLabelExpression())
+ .thenReturn(RMNodeLabelsManager.NO_LABEL);
+ when(rmContainer.getContainerId()).thenReturn(newContainerId);
+ sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
+ .put(newContainerId, rmContainer);
+ QueueMetrics queueA1M = queueA1.getMetrics();
+ queueA1M.incrPendingResources(rmContainer.getNodeLabelExpression(),
+ "user1", 1, resource);
+ queueA1M.allocateResources(rmContainer.getNodeLabelExpression(),
+ "user1", resource);
+ // remove attempt
+ sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
+ RMAppAttemptState.KILLED, true));
+ // Move application to queue b1
+ sch.moveApplication(appAttemptId.getApplicationId(), "b1");
+ // Check queue metrics after move
+ Assert.assertEquals(0, queueA1.getNumApplications());
+ Assert.assertEquals(1, queueB.getNumApplications());
+ Assert.assertEquals(0, queueB1.getNumApplications());
+
+ // Release attempt add event
+ ApplicationAttemptId appAttemptId2 =
+ BuilderUtils.newApplicationAttemptId(appAttemptId.getApplicationId(), 2);
+ SchedulerEvent addAttemptEvent2 =
+ new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
+ sch.handle(addAttemptEvent2);
+
+ // Check metrics after attempt added
+ Assert.assertEquals(0, queueA1.getNumApplications());
+ Assert.assertEquals(1, queueB.getNumApplications());
+ Assert.assertEquals(1, queueB1.getNumApplications());
+
+
+ QueueMetrics queueB1M = queueB1.getMetrics();
+ QueueMetrics queueBM = queueB.getMetrics();
+ // Verify allocation MB of current state
+ Assert.assertEquals(0, queueA1M.getAllocatedMB());
+ Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+ Assert.assertEquals(1024, queueB1M.getAllocatedMB());
+ Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores());
+
+ // remove attempt
+ sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
+ RMAppAttemptState.FINISHED, false));
+
+ Assert.assertEquals(0, queueA1M.getAllocatedMB());
+ Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+ Assert.assertEquals(0, queueB1M.getAllocatedMB());
+ Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores());
+
+ verifyQueueMetrics(queueB1M);
+ verifyQueueMetrics(queueBM);
+ // Verify queue A1 metrics
+ verifyQueueMetrics(queueA1M);
+ rm.close();
+ }
+
+ @Test
+ public void testAppSubmission() throws Exception {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setQueues(A, new String[]{"a1", "a2", "b"});
+ conf.setCapacity(A1, 20);
+ conf.setCapacity("root.a.b", 10);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ RMApp noParentQueueApp = submitAppAndWaitForState(rm, "q", RMAppState.FAILED);
+ Assert.assertEquals(RMAppState.FAILED, noParentQueueApp.getState());
+
+ RMApp ambiguousQueueApp = submitAppAndWaitForState(rm, "b", RMAppState.FAILED);
+ Assert.assertEquals(RMAppState.FAILED, ambiguousQueueApp.getState());
+
+ RMApp emptyPartQueueApp = submitAppAndWaitForState(rm, "root..a1", RMAppState.FAILED);
+ Assert.assertEquals(RMAppState.FAILED, emptyPartQueueApp.getState());
+
+ RMApp failedAutoQueue = submitAppAndWaitForState(rm, "root.a.b.c.d", RMAppState.FAILED);
+ Assert.assertEquals(RMAppState.FAILED, failedAutoQueue.getState());
+ }
+
+ private RMApp submitAppAndWaitForState(MockRM rm, String b, RMAppState state) throws Exception {
+ MockRMAppSubmissionData ambiguousQueueAppData =
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ .withWaitForAppAcceptedState(false)
+ .withAppName("app")
+ .withUser("user")
+ .withAcls(null)
+ .withQueue(b)
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app1 = MockRMAppSubmitter.submit(rm, ambiguousQueueAppData);
+ rm.waitForState(app1.getApplicationId(), state);
+ return app1;
+ }
+
+ private int getNumAppsInQueue(String name, List<CSQueue> queues) {
+ for (CSQueue queue : queues) {
+ if (queue.getQueueShortName().equals(name)) {
+ return queue.getNumApplications();
+ }
+ }
+ return -1;
+ }
+
+ private CapacitySchedulerQueueInfo getQueueInfo(String name,
+ CapacitySchedulerQueueInfoList info) {
+ if (info != null) {
+ for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
+ if (queueInfo.getQueueName().equals(name)) {
+ return queueInfo;
+ } else {
+ CapacitySchedulerQueueInfo result =
+ getQueueInfo(name, queueInfo.getQueues());
+ if (result == null) {
+ continue;
+ }
+ return result;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void verifyQueueMetrics(QueueMetrics queue) {
+ Assert.assertEquals(0, queue.getPendingMB());
+ Assert.assertEquals(0, queue.getActiveUsers());
+ Assert.assertEquals(0, queue.getActiveApps());
+ Assert.assertEquals(0, queue.getAppsPending());
+ Assert.assertEquals(0, queue.getAppsRunning());
+ Assert.assertEquals(0, queue.getAllocatedMB());
+ Assert.assertEquals(0, queue.getAllocatedVirtualCores());
+ }
+
+ private Configuration getCapacityConfiguration(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[]{"a", "b"});
+ conf.setCapacity(A, 50);
+ conf.setCapacity(B, 50);
+ conf.setQueues(A, new String[]{"a1", "a2"});
+ conf.setCapacity(A1, 50);
+ conf.setCapacity(A2, 50);
+ conf.setQueues(B, new String[]{"b1"});
+ conf.setCapacity(B1, 100);
+ return conf;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodes.java
new file mode 100644
index 0000000..c557354
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodes.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.NULL_UPDATE_REQUESTS;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerNodes {
+
+ private ResourceManager resourceManager = null;
+
+ @Before
+ public void setUp() throws Exception {
+ resourceManager = createResourceManager();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stopResourceManager(resourceManager);
+ }
+
+ @Test
+ public void testReconnectedNode() throws Exception {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(csConf);
+ cs.start();
+ cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
+ null, null, new RMContainerTokenSecretManager(csConf),
+ new NMTokenSecretManagerInRM(csConf),
+ new ClientToAMTokenSecretManagerInRM(), null));
+
+ RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+ RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+ cs.handle(new NodeAddedSchedulerEvent(n1));
+ cs.handle(new NodeAddedSchedulerEvent(n2));
+
+ Assert.assertEquals(6 * GB, cs.getClusterResource().getMemorySize());
+
+ // reconnect n1 with downgraded memory
+ n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+ cs.handle(new NodeRemovedSchedulerEvent(n1));
+ cs.handle(new NodeAddedSchedulerEvent(n1));
+
+ Assert.assertEquals(4 * GB, cs.getClusterResource().getMemorySize());
+ cs.stop();
+ }
+
+ @Test
+ public void testBlackListNodes() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ String host = "127.0.0.1";
+ RMNode node =
+ MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
+ cs.handle(new NodeAddedSchedulerEvent(node));
+
+ ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user");
+
+ // Verify the blacklist can be updated independent of requesting containers
+ cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
+ Collections.<ContainerId>emptyList(),
+ Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
+ Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
+ .isPlaceBlacklisted(host));
+ cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
+ Collections.<ContainerId>emptyList(), null,
+ Collections.singletonList(host), NULL_UPDATE_REQUESTS);
+ Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
+ .isPlaceBlacklisted(host));
+ rm.stop();
+ }
+
+ @Test
+ public void testNumClusterNodes() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(conf);
+ RMContext rmContext = TestUtils.getMockRMContext();
+ cs.setRMContext(rmContext);
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ cs.init(csConf);
+ cs.start();
+ assertEquals(0, cs.getNumClusterNodes());
+
+ RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+ RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+ cs.handle(new NodeAddedSchedulerEvent(n1));
+ cs.handle(new NodeAddedSchedulerEvent(n2));
+ assertEquals(2, cs.getNumClusterNodes());
+
+ cs.handle(new NodeRemovedSchedulerEvent(n1));
+ assertEquals(1, cs.getNumClusterNodes());
+ cs.handle(new NodeAddedSchedulerEvent(n1));
+ assertEquals(2, cs.getNumClusterNodes());
+ cs.handle(new NodeRemovedSchedulerEvent(n2));
+ cs.handle(new NodeRemovedSchedulerEvent(n1));
+ assertEquals(0, cs.getNumClusterNodes());
+
+ cs.stop();
+ }
+
+ @Test
+ public void testDefaultNodeLabelExpressionQueueConfig() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ conf.setDefaultNodeLabelExpression("root.a", " x");
+ conf.setDefaultNodeLabelExpression("root.b", " y ");
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+
+ QueueInfo queueInfoA = cs.getQueueInfo("a", true, false);
+ Assert.assertEquals("Queue Name should be a", "a",
+ queueInfoA.getQueueName());
+ Assert.assertEquals("Queue Path should be root.a", "root.a",
+ queueInfoA.getQueuePath());
+ Assert.assertEquals("Default Node Label Expression should be x", "x",
+ queueInfoA.getDefaultNodeLabelExpression());
+
+ QueueInfo queueInfoB = cs.getQueueInfo("b", true, false);
+ Assert.assertEquals("Queue Name should be b", "b",
+ queueInfoB.getQueueName());
+ Assert.assertEquals("Queue Path should be root.b", "root.b",
+ queueInfoB.getQueuePath());
+ Assert.assertEquals("Default Node Label Expression should be y", "y",
+ queueInfoB.getDefaultNodeLabelExpression());
+ }
+
+ @Test
+ public void testRemovedNodeDecommissioningNode() throws Exception {
+ NodeStatus mockNodeStatus = createMockNodeStatus();
+
+ // Register nodemanager
+ NodeManager nm = registerNode(resourceManager, "host_decom", 1234, 2345,
+ NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+ mockNodeStatus);
+
+ RMNode node =
+ resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+ // Send a heartbeat to kick the tires on the Scheduler
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+ resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+ // force remove the node to simulate race condition
+ ((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
+ removeNode(nm.getNodeId());
+ // Kick off another heartbeat with the node state mocked to decommissioning
+ RMNode spyNode =
+ Mockito.spy(resourceManager.getRMContext().getRMNodes()
+ .get(nm.getNodeId()));
+ when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+ resourceManager.getResourceScheduler().handle(
+ new NodeUpdateSchedulerEvent(spyNode));
+ }
+
+ @Test
+ public void testResourceUpdateDecommissioningNode() throws Exception {
+ // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
+ // to have 0 available resource
+ RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
+ Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
+ when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler<Event>() {
+ @Override
+ public void handle(Event event) {
+ if (event instanceof RMNodeResourceUpdateEvent) {
+ RMNodeResourceUpdateEvent resourceEvent =
+ (RMNodeResourceUpdateEvent) event;
+ resourceManager
+ .getResourceScheduler()
+ .getSchedulerNode(resourceEvent.getNodeId())
+ .updateTotalResource(resourceEvent.getResourceOption().getResource());
+ }
+ }
+ });
+ Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
+ ((CapacityScheduler) resourceManager.getResourceScheduler())
+ .setRMContext(spyContext);
+ ((AsyncDispatcher) mockDispatcher).start();
+
+ NodeStatus mockNodeStatus = createMockNodeStatus();
+
+ // Register node
+ String host0 = "host_0";
+ NodeManager nm0 = registerNode(resourceManager, host0, 1234, 2345,
+ NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+ mockNodeStatus);
+ // ResourceRequest priorities
+ Priority priority0 = Priority.newInstance(0);
+
+ // Submit an application
+ Application application0 =
+ new Application("user_0", "a1", resourceManager);
+ application0.submit();
+
+ application0.addNodeManager(host0, 1234, nm0);
+
+ Resource capability00 = Resources.createResource(1 * GB, 1);
+ application0.addResourceRequestSpec(priority0, capability00);
+
+ Task task00 =
+ new Task(application0, priority0, new String[]{host0});
+ application0.addTask(task00);
+
+ // Send resource requests to the scheduler
+ application0.schedule();
+
+ nodeUpdate(resourceManager, nm0);
+ // Kick off another heartbeat with the node state mocked to decommissioning
+ // This should update the schedulernodes to have 0 available resource
+ RMNode spyNode =
+ Mockito.spy(resourceManager.getRMContext().getRMNodes()
+ .get(nm0.getNodeId()));
+ when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+ resourceManager.getResourceScheduler().handle(
+ new NodeUpdateSchedulerEvent(spyNode));
+
+ // Get allocations from the scheduler
+ application0.schedule();
+
+ // Check the used resource is 1 GB 1 core
+ Assert.assertEquals(1 * GB, nm0.getUsed().getMemorySize());
+ Resource usedResource =
+ resourceManager.getResourceScheduler()
+ .getSchedulerNode(nm0.getNodeId()).getAllocatedResource();
+ Assert.assertEquals("Used Resource Memory Size should be 1GB", 1 * GB,
+ usedResource.getMemorySize());
+ Assert.assertEquals("Used Resource Virtual Cores should be 1", 1,
+ usedResource.getVirtualCores());
+ // Check total resource of scheduler node is also changed to 1 GB 1 core
+ Resource totalResource =
+ resourceManager.getResourceScheduler()
+ .getSchedulerNode(nm0.getNodeId()).getTotalResource();
+ Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
+ totalResource.getMemorySize());
+ Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,
+ totalResource.getVirtualCores());
+ // Check the available resource is 0/0
+ Resource availableResource =
+ resourceManager.getResourceScheduler()
+ .getSchedulerNode(nm0.getNodeId()).getUnallocatedResource();
+ Assert.assertEquals("Available Resource Memory Size should be 0", 0,
+ availableResource.getMemorySize());
+ Assert.assertEquals("Available Resource Memory Size should be 0", 0,
+ availableResource.getVirtualCores());
+ // Kick off another heartbeat where the RMNodeResourceUpdateEvent would
+ // be skipped for DECOMMISSIONING state since the total resource is
+ // already equal to used resource from the previous heartbeat.
+ when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+ resourceManager.getResourceScheduler().handle(
+ new NodeUpdateSchedulerEvent(spyNode));
+ verify(mockDispatcher, times(4)).getEventHandler();
+ }
+
+ @Test
+ public void testSchedulingOnRemovedNode() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
+ false);
+
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ RMApp app = MockRMAppSubmitter.submitWithMemory(100, rm);
+ rm.drainEvents();
+
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
+ MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+ //remove nm2 to keep am alive
+ MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
+
+ am.allocate(ResourceRequest.ANY, 2048, 1, null);
+
+ CapacityScheduler scheduler =
+ (CapacityScheduler) rm.getRMContext().getScheduler();
+ FiCaSchedulerNode node =
+ (FiCaSchedulerNode)
+ scheduler.getNodeTracker().getNode(nm2.getNodeId());
+ scheduler.handle(new NodeRemovedSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
+ // schedulerNode is removed, try allocate a container
+ scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
+ true);
+
+ AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
+ new AppAttemptRemovedSchedulerEvent(
+ am.getApplicationAttemptId(),
+ RMAppAttemptState.FINISHED, false);
+ scheduler.handle(appRemovedEvent1);
+ rm.stop();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java
new file mode 100644
index 0000000..fc18700
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java
@@ -0,0 +1,888 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocation;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.unsetMaxAllocation;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ExpectedCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfWithoutChildrenOfB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithB1AsParentQueue;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerQueues {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestCapacitySchedulerQueues.class);
+ private ResourceManager resourceManager = null;
+ private RMContext mockContext;
+
+ @Before
+ public void setUp() throws Exception {
+ resourceManager = createResourceManager();
+ mockContext = createMockRMContext();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stopResourceManager(resourceManager);
+ }
+
+ /**
+ * Test that parseQueue throws an exception when two leaf queues have the
+ * same name.
+ *
+ * @throws IOException
+ */
+ @Test(expected = IOException.class)
+ public void testParseQueue() throws IOException {
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ cs.init(conf);
+ cs.start();
+
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[]{"b1"});
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
+ conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
+
+ cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null));
+ }
+
+ @Test
+ public void testRefreshQueues() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ setupQueueConfiguration(conf);
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, rmContext);
+ checkQueueStructureCapacities(cs);
+
+ conf.setCapacity(A, 80f);
+ conf.setCapacity(B, 20f);
+ cs.reinitialize(conf, mockContext);
+ checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f));
+ cs.stop();
+ }
+
+ @Test
+ public void testRefreshQueuesWithNewQueue() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null));
+ checkQueueStructureCapacities(cs);
+
+ // Add a new queue b4
+ final String b4 = B + ".b4";
+ final float b4Capacity = 10;
+ final float modifiedB3Capacity = B3_CAPACITY - b4Capacity;
+
+ try {
+ conf.setCapacity(A, 80f);
+ conf.setCapacity(B, 20f);
+ conf.setQueues(B, new String[]{"b1", "b2", "b3", "b4"});
+ conf.setCapacity(B1, B1_CAPACITY);
+ conf.setCapacity(B2, B2_CAPACITY);
+ conf.setCapacity(B3, modifiedB3Capacity);
+ conf.setCapacity(b4, b4Capacity);
+ cs.reinitialize(conf, mockContext);
+
+ final float capA = 80f / 100.0f;
+ final float capB = 20f / 100.0f;
+ Map<String, ExpectedCapacities> expectedCapacities =
+ getDefaultCapacities(capA, capB);
+ expectedCapacities.put(B3,
+ new ExpectedCapacities(modifiedB3Capacity / 100.0f, capB));
+ expectedCapacities.put(b4, new ExpectedCapacities(b4Capacity / 100.0f, capB));
+ checkQueueStructureCapacities(cs, expectedCapacities);
+
+ // Verify parent for B4
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueB = findQueue(rootQueue, B);
+ CSQueue queueB4 = findQueue(queueB, b4);
+
+ assertEquals(queueB, queueB4.getParent());
+ } finally {
+ cs.stop();
+ }
+ }
+
+ @Test
+ public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
+ // queue refresh should not allow changing the maximum allocation setting
+ // per queue to be smaller than previous setting
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueStructureCapacities(cs);
+
+ assertEquals("max allocation in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max allocation for A1",
+ Resources.none(),
+ conf.getQueueMaximumAllocation(A1));
+ assertEquals("max allocation",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
+
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueA = findQueue(rootQueue, A);
+ CSQueue queueA1 = findQueue(queueA, A1);
+ assertEquals("queue max allocation", ((LeafQueue) queueA1)
+ .getMaximumAllocation().getMemorySize(), 8192);
+
+ setMaxAllocMb(conf, A1, 4096);
+
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("should have thrown exception");
+ } catch (IOException e) {
+ assertTrue("max allocation exception",
+ e.getCause().toString().contains("not be decreased"));
+ }
+
+ setMaxAllocMb(conf, A1, 8192);
+ cs.reinitialize(conf, mockContext);
+
+ setMaxAllocVcores(conf, A1,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("should have thrown exception");
+ } catch (IOException e) {
+ assertTrue("max allocation exception",
+ e.getCause().toString().contains("not be decreased"));
+ }
+ }
+
+ @Test
+ public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
+ // verify we can't set the allocation per queue larger then cluster setting
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ cs.init(conf);
+ cs.start();
+ // change max allocation for B3 queue to be larger then cluster max
+ setMaxAllocMb(conf, B3,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("should have thrown exception");
+ } catch (IOException e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+
+ setMaxAllocMb(conf, B3,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ cs.reinitialize(conf, mockContext);
+
+ setMaxAllocVcores(conf, B3,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("should have thrown exception");
+ } catch (IOException e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+ }
+
+ @Test
+ public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
+ // queue refresh should allow max allocation per queue to go larger
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ setMaxAllocMb(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ setMaxAllocVcores(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ setMaxAllocMb(conf, A1, 4096);
+ setMaxAllocVcores(conf, A1, 2);
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueStructureCapacities(cs);
+
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueA = findQueue(rootQueue, A);
+ CSQueue queueA1 = findQueue(queueA, A1);
+
+ assertEquals("max capability MB in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max capability vcores in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("max allocation MB A1",
+ 4096,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1",
+ 2,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("cluster max allocation MB",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
+ assertEquals("cluster max allocation vcores",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
+
+ assertEquals("queue max allocation", 4096,
+ queueA1.getMaximumAllocation().getMemorySize());
+
+ setMaxAllocMb(conf, A1, 6144);
+ setMaxAllocVcores(conf, A1, 3);
+ cs.reinitialize(conf, null);
+ // conf will have changed but we shouldn't be able to change max allocation
+ // for the actual queue
+ assertEquals("max allocation MB A1", 6144,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1", 3,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB cluster",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
+ assertEquals("max allocation vcores cluster",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
+ assertEquals("queue max allocation MB", 6144,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("queue max allocation vcores", 3,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max capability MB cluster",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("cluster max capability vcores",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ }
+
+ @Test
+ public void testRefreshQueuesMaxAllocationCSError() throws Exception {
+ // Try to refresh the cluster level max allocation size to be smaller
+ // and it should error out
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ setMaxAllocMb(conf, 10240);
+ setMaxAllocVcores(conf, 10);
+ setMaxAllocMb(conf, A1, 4096);
+ setMaxAllocVcores(conf, A1, 4);
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueStructureCapacities(cs);
+
+ assertEquals("max allocation MB in CS", 10240,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max allocation vcores in CS", 10,
+ cs.getMaximumResourceCapability().getVirtualCores());
+
+ setMaxAllocMb(conf, 6144);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("should have thrown exception");
+ } catch (IOException e) {
+ assertTrue("max allocation exception",
+ e.getCause().toString().contains("not be decreased"));
+ }
+
+ setMaxAllocMb(conf, 10240);
+ cs.reinitialize(conf, mockContext);
+
+ setMaxAllocVcores(conf, 8);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("should have thrown exception");
+ } catch (IOException e) {
+ assertTrue("max allocation exception",
+ e.getCause().toString().contains("not be decreased"));
+ }
+ }
+
+ @Test
+ public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
+ // Try to refresh the cluster level max allocation size to be larger
+ // and verify that if there is no setting per queue it uses the
+ // cluster level setting.
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ setMaxAllocMb(conf, 10240);
+ setMaxAllocVcores(conf, 10);
+ setMaxAllocMb(conf, A1, 4096);
+ setMaxAllocVcores(conf, A1, 4);
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueStructureCapacities(cs);
+
+ assertEquals("max allocation MB in CS", 10240,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max allocation vcores in CS", 10,
+ cs.getMaximumResourceCapability().getVirtualCores());
+
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueA = findQueue(rootQueue, A);
+ CSQueue queueB = findQueue(rootQueue, B);
+ CSQueue queueA1 = findQueue(queueA, A1);
+ CSQueue queueA2 = findQueue(queueA, A2);
+ CSQueue queueB2 = findQueue(queueB, B2);
+
+ assertEquals("queue A1 max allocation MB", 4096,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("queue A1 max allocation vcores", 4,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("queue A2 max allocation MB", 10240,
+ queueA2.getMaximumAllocation().getMemorySize());
+ assertEquals("queue A2 max allocation vcores", 10,
+ queueA2.getMaximumAllocation().getVirtualCores());
+ assertEquals("queue B2 max allocation MB", 10240,
+ queueB2.getMaximumAllocation().getMemorySize());
+ assertEquals("queue B2 max allocation vcores", 10,
+ queueB2.getMaximumAllocation().getVirtualCores());
+
+ setMaxAllocMb(conf, 12288);
+ setMaxAllocVcores(conf, 12);
+ cs.reinitialize(conf, null);
+ // cluster level setting should change and any queues without
+ // per queue setting
+ assertEquals("max allocation MB in CS", 12288,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max allocation vcores in CS", 12,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("queue A1 max MB allocation", 4096,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("queue A1 max vcores allocation", 4,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("queue A2 max MB allocation", 12288,
+ queueA2.getMaximumAllocation().getMemorySize());
+ assertEquals("queue A2 max vcores allocation", 12,
+ queueA2.getMaximumAllocation().getVirtualCores());
+ assertEquals("queue B2 max MB allocation", 12288,
+ queueB2.getMaximumAllocation().getMemorySize());
+ assertEquals("queue B2 max vcores allocation", 12,
+ queueB2.getMaximumAllocation().getVirtualCores());
+ }
+
+ /**
+ * Test for queue deletion.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshQueuesWithQueueDelete() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ setupQueueConfiguration(conf);
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, rmContext);
+ checkQueueStructureCapacities(cs);
+
+ // test delete leaf queue when there is application running.
+ Map<String, CSQueue> queues =
+ cs.getCapacitySchedulerQueueManager().getShortNameQueues();
+ String b1QTobeDeleted = "b1";
+ LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted));
+ when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
+ .thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue(b1QTobeDeleted, csB1Queue);
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithoutB1(conf);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Expected to throw exception when refresh queue tries to delete a"
+ + " queue with running apps");
+ } catch (IOException e) {
+ // ignore
+ }
+
+ // test delete leaf queue(root.b.b1) when there is no application running.
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithoutB1(conf);
+ try {
+ cs.reinitialize(conf, mockContext);
+ } catch (IOException e) {
+ LOG.error(
+ "Expected to NOT throw exception when refresh queue tries to delete"
+ + " a queue WITHOUT running apps",
+ e);
+ fail("Expected to NOT throw exception when refresh queue tries to delete"
+ + " a queue WITHOUT running apps");
+ }
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueB = findQueue(rootQueue, B);
+ CSQueue queueB3 = findQueue(queueB, B1);
+ assertNull("Refresh needs to support delete of leaf queue ", queueB3);
+
+ // reset back to default configuration for testing parent queue delete
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ cs.reinitialize(conf, rmContext);
+ checkQueueStructureCapacities(cs);
+
+ // set the configurations such that it fails once but should be successfull
+ // next time
+ queues = cs.getCapacitySchedulerQueueManager().getShortNameQueues();
+ CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b"));
+ when(bQueue.getState()).thenReturn(QueueState.DRAINING)
+ .thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue("b", bQueue);
+
+ bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
+ when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
+
+ bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
+ when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
+
+ bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
+ when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
+
+ // test delete Parent queue when there is application running.
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithoutB(conf);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Expected to throw exception when refresh queue tries to delete a"
+ + " parent queue with running apps in children queue");
+ } catch (IOException e) {
+ // ignore
+ }
+
+ // test delete Parent queue when there is no application running.
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithoutB(conf);
+ try {
+ cs.reinitialize(conf, mockContext);
+ } catch (IOException e) {
+ fail("Expected to not throw exception when refresh queue tries to delete"
+ + " a queue without running apps");
+ }
+ rootQueue = cs.getRootQueue();
+ queueB = findQueue(rootQueue, B);
+ String message =
+ "Refresh needs to support delete of Parent queue and its children.";
+ assertNull(message, queueB);
+ assertNull(message,
+ cs.getCapacitySchedulerQueueManager().getQueues().get("b"));
+ assertNull(message,
+ cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
+ assertNull(message,
+ cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
+
+ cs.stop();
+ }
+
+ /**
+ * Test for all child queue deletion and thus making parent queue a child.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ setupQueueConfiguration(conf);
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, rmContext);
+ checkQueueStructureCapacities(cs);
+
+ // test delete all leaf queues when there is no application running.
+ Map<String, CSQueue> queues =
+ cs.getCapacitySchedulerQueueManager().getShortNameQueues();
+
+ CSQueue bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
+ when(bQueue.getState()).thenReturn(QueueState.RUNNING)
+ .thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
+
+ bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
+ when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
+
+ bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
+ when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+ cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
+
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfWithoutChildrenOfB(conf);
+
+ // test convert parent queue to leaf queue(root.b) when there is no
+ // application running.
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Expected to throw exception when refresh queue tries to make parent"
+ + " queue a child queue when one of its children is still running.");
+ } catch (IOException e) {
+ //do not do anything, expected exception
+ }
+
+ // test delete leaf queues(root.b.b1,b2,b3) when there is no application
+ // running.
+ try {
+ cs.reinitialize(conf, mockContext);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Expected to NOT throw exception when refresh queue tries to delete"
+ + " all children of a parent queue(without running apps).");
+ }
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueB = findQueue(rootQueue, B);
+ assertNotNull("Parent Queue B should not be deleted", queueB);
+ Assert.assertTrue("As Queue'B children are not deleted",
+ queueB instanceof LeafQueue);
+
+ String message =
+ "Refresh needs to support delete of all children of Parent queue.";
+ assertNull(message,
+ cs.getCapacitySchedulerQueueManager().getQueues().get("b3"));
+ assertNull(message,
+ cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
+ assertNull(message,
+ cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
+
+ cs.stop();
+ }
+
+ /**
+ * Test if we can convert a leaf queue to a parent queue.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 10000)
+ public void testConvertLeafQueueToParentQueue() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ setupQueueConfiguration(conf);
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, rmContext);
+ checkQueueStructureCapacities(cs);
+
+ String targetQueue = "b1";
+ CSQueue b1 = cs.getQueue(targetQueue);
+ Assert.assertEquals(QueueState.RUNNING, b1.getState());
+
+ // test if we can convert a leaf queue which is in RUNNING state
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithB1AsParentQueue(conf);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Expected to throw exception when refresh queue tries to convert"
+ + " a child queue to a parent queue.");
+ } catch (IOException e) {
+ // ignore
+ }
+
+ // now set queue state for b1 to STOPPED
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
+ cs.reinitialize(conf, mockContext);
+ Assert.assertEquals(QueueState.STOPPED, b1.getState());
+
+ // test if we can convert a leaf queue which is in STOPPED state
+ conf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithB1AsParentQueue(conf);
+ try {
+ cs.reinitialize(conf, mockContext);
+ } catch (IOException e) {
+ fail("Expected to NOT throw exception when refresh queue tries"
+ + " to convert a leaf queue WITHOUT running apps");
+ }
+ b1 = cs.getQueue(targetQueue);
+ Assert.assertTrue(b1 instanceof ParentQueue);
+ Assert.assertEquals(QueueState.RUNNING, b1.getState());
+ Assert.assertTrue(!b1.getChildQueues().isEmpty());
+ }
+
+ @Test
+ public void testQueuesMaxAllocationInheritance() throws Exception {
+ // queue level max allocation is set by the queue configuration explicitly
+ // or inherits from the parent.
+
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ setMaxAllocMb(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ setMaxAllocVcores(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+ // Test the child queue overrides
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=4096,vcores=2");
+ setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
+ setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
+ setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");
+
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueStructureCapacities(cs);
+
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueA = findQueue(rootQueue, A);
+ CSQueue queueB = findQueue(rootQueue, B);
+ CSQueue queueA1 = findQueue(queueA, A1);
+ CSQueue queueA2 = findQueue(queueA, A2);
+ CSQueue queueB1 = findQueue(queueB, B1);
+ CSQueue queueB2 = findQueue(queueB, B2);
+
+ assertEquals("max capability MB in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max capability vcores in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("max allocation MB A1",
+ 6144,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1",
+ 2,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB A2", 4096,
+ queueA2.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A2",
+ 2,
+ queueA2.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB B", 5120,
+ queueB.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation MB B1", 5120,
+ queueB1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation MB B2", 1024,
+ queueB2.getMaximumAllocation().getMemorySize());
+
+ // Test get the max-allocation from different parent
+ unsetMaxAllocation(conf, A1);
+ unsetMaxAllocation(conf, B);
+ unsetMaxAllocation(conf, B1);
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=6144,vcores=2");
+ setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");
+
+ cs.reinitialize(conf, mockContext);
+
+ assertEquals("max capability MB in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max capability vcores in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("max allocation MB A1",
+ 8192,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1",
+ 2,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB B1",
+ 6144,
+ queueB1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores B1",
+ 2,
+ queueB1.getMaximumAllocation().getVirtualCores());
+
+ // Test the default
+ unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT);
+ unsetMaxAllocation(conf, A);
+ unsetMaxAllocation(conf, A1);
+ cs.reinitialize(conf, mockContext);
+
+ assertEquals("max capability MB in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ cs.getMaximumResourceCapability().getMemorySize());
+ assertEquals("max capability vcores in CS",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ cs.getMaximumResourceCapability().getVirtualCores());
+ assertEquals("max allocation MB A1",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ queueA1.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A1",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ queueA1.getMaximumAllocation().getVirtualCores());
+ assertEquals("max allocation MB A2",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ queueA2.getMaximumAllocation().getMemorySize());
+ assertEquals("max allocation vcores A2",
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+ queueA2.getMaximumAllocation().getVirtualCores());
+ }
+
+ @Test
+ public void testVerifyQueuesMaxAllocationConf() throws Exception {
+ // queue level max allocation can't exceed the cluster setting
+
+ CapacityScheduler cs = new CapacityScheduler();
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(resourceManager.getRMContext());
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ setMaxAllocMb(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ setMaxAllocVcores(conf,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+ long largerMem =
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024;
+ long largerVcores =
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 10;
+
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, mockContext);
+ checkQueueStructureCapacities(cs);
+
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=" + largerMem + ",vcores=2");
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Queue Root maximum allocation can't exceed the cluster setting");
+ } catch (Exception e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+
+ setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+ "memory-mb=4096,vcores=2");
+ setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
+ setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Queue A1 maximum allocation can't exceed the cluster setting");
+ } catch (Exception e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+ setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
+ try {
+ cs.reinitialize(conf, mockContext);
+ fail("Queue A1 maximum allocation can't exceed the cluster setting");
+ } catch (Exception e) {
+ assertTrue("maximum allocation exception",
+ e.getCause().getMessage().contains("maximum allocation"));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org