You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/12/16 22:39:44 UTC

[hadoop] branch trunk updated: YARN-10963. Split TestCapacityScheduler by test categories. Contributed by Tamas Domok

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aec9cdb  YARN-10963. Split TestCapacityScheduler by test categories. Contributed by Tamas Domok
aec9cdb is described below

commit aec9cdb467d81f99ca74a4cf3e076d9e1abec728
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Thu Dec 16 23:39:18 2021 +0100

    YARN-10963. Split TestCapacityScheduler by test categories. Contributed by Tamas Domok
---
 .../CapacitySchedulerConfigGeneratorForTest.java   |   52 +
 .../capacity/CapacitySchedulerTestUtilities.java   |  149 +
 .../scheduler/capacity/TestCapacityScheduler.java  | 2889 +-------------------
 .../capacity/TestCapacitySchedulerApps.java        | 1499 ++++++++++
 .../capacity/TestCapacitySchedulerNodes.java       |  387 +++
 .../capacity/TestCapacitySchedulerQueues.java      |  888 ++++++
 6 files changed, 3046 insertions(+), 2818 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
index 873e3b9..087b797 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -50,4 +54,52 @@ public final class CapacitySchedulerConfigGeneratorForTest {
     return createConfiguration(conf);
   }
 
+  public static void setMinAllocMb(Configuration conf, int minAllocMb) {
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        minAllocMb);
+  }
+
+  public static void setMaxAllocMb(Configuration conf, int maxAllocMb) {
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        maxAllocMb);
+  }
+
+  public static void setMaxAllocMb(CapacitySchedulerConfiguration conf,
+                                   String queueName, int maxAllocMb) {
+    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+        + MAXIMUM_ALLOCATION_MB;
+    conf.setInt(propName, maxAllocMb);
+  }
+
+  public static void setMinAllocVcores(Configuration conf, int minAllocVcores) {
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        minAllocVcores);
+  }
+
+  public static void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        maxAllocVcores);
+  }
+
+  public static void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
+                                       String queueName, int maxAllocVcores) {
+    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+        + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+    conf.setInt(propName, maxAllocVcores);
+  }
+
+  public static void setMaxAllocation(CapacitySchedulerConfiguration conf,
+                                      String queueName, String maxAllocation) {
+    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+        + MAXIMUM_ALLOCATION;
+    conf.set(propName, maxAllocation);
+  }
+
+  public static void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
+                                        String queueName) {
+    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+        + MAXIMUM_ALLOCATION;
+    conf.unset(propName);
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java
index 3d098f8..b2c6548 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestUtilities.java
@@ -18,14 +18,48 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.util.Sets;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.junit.Assert;
 
+import java.io.IOException;
 import java.util.Set;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public final class CapacitySchedulerTestUtilities {
   public static final int GB = 1024;
 
@@ -69,4 +103,119 @@ public final class CapacitySchedulerTestUtilities {
       }
     }
   }
+
+  public static ResourceManager createResourceManager() throws Exception {
+    ResourceUtils.resetResourceTypes(new Configuration());
+    DefaultMetricsSystem.setMiniClusterMode(true);
+    ResourceManager resourceManager = new ResourceManager() {
+      @Override
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+        mgr.init(getConfig());
+        return mgr;
+      }
+    };
+    CapacitySchedulerConfiguration csConf
+        = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER,
+        CapacityScheduler.class, ResourceScheduler.class);
+    resourceManager.init(conf);
+    resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+    resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
+    ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
+    return resourceManager;
+  }
+
+  public static RMContext createMockRMContext() {
+    RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getConfigurationProvider()).thenReturn(
+        new LocalConfigurationProvider());
+    return mockContext;
+  }
+
+  public static void stopResourceManager(ResourceManager resourceManager) throws Exception {
+    if (resourceManager != null) {
+      QueueMetrics.clearQueueMetrics();
+      DefaultMetricsSystem.shutdown();
+      resourceManager.stop();
+    }
+  }
+
+  public static ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
+                                         int clusterTs, int appId, String queue,
+                                         String user) {
+    ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
+    ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
+        appId1, appId);
+
+    RMAppAttemptMetrics attemptMetric1 =
+        new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
+    RMAppImpl app1 = mock(RMAppImpl.class);
+    when(app1.getApplicationId()).thenReturn(appId1);
+    RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt1.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext = mock(
+        ApplicationSubmissionContext.class);
+    when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
+    when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
+    when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
+    rm.getRMContext().getRMApps().put(appId1, app1);
+
+    SchedulerEvent addAppEvent1 =
+        new AppAddedSchedulerEvent(appId1, queue, user);
+    cs.handle(addAppEvent1);
+    SchedulerEvent addAttemptEvent1 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
+    cs.handle(addAttemptEvent1);
+    return appAttemptId1;
+  }
+
+  public static MockRM setUpMove() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    return setUpMove(conf);
+  }
+
+  public static MockRM setUpMove(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    return rm;
+  }
+
+  public static void nodeUpdate(ResourceManager rm, NodeManager nm) {
+    RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    rm.getResourceScheduler().handle(nodeUpdate);
+  }
+
+  public static NodeManager registerNode(ResourceManager rm, String hostName,
+                                   int containerManagerPort, int httpPort, String rackName,
+                                   Resource capability, NodeStatus nodeStatus)
+      throws IOException, YarnException {
+    NodeManager nm = new NodeManager(hostName,
+        containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
+    NodeAddedSchedulerEvent nodeAddEvent1 =
+        new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    rm.getResourceScheduler().handle(nodeAddEvent1);
+    return nm;
+  }
+
+  public static void checkApplicationResourceUsage(int expected, Application application) {
+    Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
+  }
+
+  public static void checkNodeResourceUsage(int expected, NodeManager node) {
+    Assert.assertEquals(expected, node.getUsed().getMemorySize());
+    node.checkResourceUsage();
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index c3548cc..4a9e45e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -19,13 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ExpectedCapacities;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfWithoutChildrenOfB;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
@@ -35,22 +35,22 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithB1AsParentQueue;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB1;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkApplicationResourceUsage;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkNodeResourceUsage;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResource;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResourceGreaterThanZero;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.toSet;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.waitforNMRegistered;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption;
@@ -61,16 +61,11 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.T
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
@@ -90,7 +85,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Groups;
@@ -109,7 +103,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -119,11 +112,9 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
@@ -131,10 +122,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -159,7 +146,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfil
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
@@ -167,24 +153,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@@ -199,16 +176,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -239,51 +211,13 @@ public class TestCapacityScheduler {
 
   @Before
   public void setUp() throws Exception {
-    ResourceUtils.resetResourceTypes(new Configuration());
-    DefaultMetricsSystem.setMiniClusterMode(true);
-    resourceManager = new ResourceManager() {
-      @Override
-      protected RMNodeLabelsManager createNodeLabelManager() {
-        RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
-        mgr.init(getConfig());
-        return mgr;
-      }
-    };
-    CapacitySchedulerConfiguration csConf
-       = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(csConf);
-    YarnConfiguration conf = new YarnConfiguration(csConf);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER,
-        CapacityScheduler.class, ResourceScheduler.class);
-    resourceManager.init(conf);
-    resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
-    resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
-    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
-    mockContext = mock(RMContext.class);
-    when(mockContext.getConfigurationProvider()).thenReturn(
-        new LocalConfigurationProvider());
+    resourceManager = createResourceManager();
+    mockContext = createMockRMContext();
   }
 
   @After
   public void tearDown() throws Exception {
-    if (resourceManager != null) {
-      QueueMetrics.clearQueueMetrics();
-      DefaultMetricsSystem.shutdown();
-      resourceManager.stop();
-    }
-  }
-
-  private NodeManager registerNode(ResourceManager rm, String hostName,
-      int containerManagerPort, int httpPort, String rackName,
-      Resource capability, NodeStatus nodeStatus)
-      throws IOException, YarnException {
-    NodeManager nm = new NodeManager(hostName,
-        containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
-    NodeAddedSchedulerEvent nodeAddEvent1 =
-        new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
-            .get(nm.getNodeId()));
-    rm.getResourceScheduler().handle(nodeAddEvent1);
-    return nm;
+    stopResourceManager(resourceManager);
   }
 
   @Test (timeout = 30000)
@@ -291,8 +225,9 @@ public class TestCapacityScheduler {
     CapacityScheduler scheduler = new CapacityScheduler();
     scheduler.setRMContext(resourceManager.getRMContext());
     Configuration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
+
+    setMinAllocMb(conf, 2048);
+    setMaxAllocMb(conf, 1024);
     try {
       scheduler.init(conf);
       fail("Exception is expected because the min memory allocation is" +
@@ -305,8 +240,8 @@ public class TestCapacityScheduler {
     }
 
     conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1);
+    setMinAllocVcores(conf, 2);
+    setMaxAllocVcores(conf, 1);
     try {
       scheduler.reinitialize(conf, mockContext);
       fail("Exception is expected because the min vcores allocation is" +
@@ -319,19 +254,6 @@ public class TestCapacityScheduler {
     }
   }
 
-  private NodeManager registerNode(String hostName, int containerManagerPort,
-      int httpPort, String rackName,
-      Resource capability, NodeStatus nodeStatus)
-      throws IOException, YarnException {
-    NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
-        rackName, capability, resourceManager, nodeStatus);
-    NodeAddedSchedulerEvent nodeAddEvent1 =
-        new NodeAddedSchedulerEvent(resourceManager.getRMContext()
-            .getRMNodes().get(nm.getNodeId()));
-    resourceManager.getResourceScheduler().handle(nodeAddEvent1);
-    return nm;
-  }
-
   @Test
   public void testCapacityScheduler() throws Exception {
 
@@ -342,13 +264,13 @@ public class TestCapacityScheduler {
     // Register node1
     String host_0 = "host_0";
     NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+        registerNode(resourceManager, host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
             Resources.createResource(4 * GB, 1), mockNodeStatus);
 
     // Register node2
     String host_1 = "host_1";
     NodeManager nm_1 =
-        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+        registerNode(resourceManager, host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
             Resources.createResource(2 * GB, 1), mockNodeStatus);
 
     // ResourceRequest priorities
@@ -397,10 +319,10 @@ public class TestCapacityScheduler {
     LOG.info("Kick!");
 
     // task_0_0 and task_1_0 allocated, used=4G
-    nodeUpdate(nm_0);
+    nodeUpdate(resourceManager, nm_0);
 
     // nothing allocated
-    nodeUpdate(nm_1);
+    nodeUpdate(resourceManager, nm_1);
 
     // Get allocations from the scheduler
     application_0.schedule();     // task_0_0 
@@ -429,11 +351,11 @@ public class TestCapacityScheduler {
     // Send a heartbeat to kick the tires on the Scheduler
     LOG.info("Sending hb from " + nm_0.getHostName());
     // nothing new, used=4G
-    nodeUpdate(nm_0);
+    nodeUpdate(resourceManager, nm_0);
 
     LOG.info("Sending hb from " + nm_1.getHostName());
     // task_0_1 is prefer as locality, used=2G
-    nodeUpdate(nm_1);
+    nodeUpdate(resourceManager, nm_1);
 
     // Get allocations from the scheduler
     LOG.info("Trying to allocate...");
@@ -443,8 +365,8 @@ public class TestCapacityScheduler {
     application_1.schedule();
     checkApplicationResourceUsage(5 * GB, application_1);
 
-    nodeUpdate(nm_0);
-    nodeUpdate(nm_1);
+    nodeUpdate(resourceManager, nm_0);
+    nodeUpdate(resourceManager, nm_1);
 
     checkNodeResourceUsage(4*GB, nm_0);
     checkNodeResourceUsage(2*GB, nm_1);
@@ -658,21 +580,6 @@ public class TestCapacityScheduler {
     LOG.info("--- END: testAssignMultiple ---");
   }
 
-  private void nodeUpdate(ResourceManager rm, NodeManager nm) {
-    RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
-    // Send a heartbeat to kick the tires on the Scheduler
-    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
-    rm.getResourceScheduler().handle(nodeUpdate);
-  }
-
-  private void nodeUpdate(NodeManager nm) {
-    RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
-    // Send a heartbeat to kick the tires on the Scheduler
-    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
-    resourceManager.getResourceScheduler().handle(nodeUpdate);
-  }
-
-
   @Test
   public void testMaximumCapacitySetup() {
     float delta = 0.0000001f;
@@ -695,10 +602,8 @@ public class TestCapacityScheduler {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
 
     setupQueueConfiguration(conf);
-    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(A1)
-        + MAXIMUM_ALLOCATION_MB, "1024");
-    conf.set(CapacitySchedulerConfiguration.getQueuePrefix(A1)
-        + MAXIMUM_ALLOCATION_VCORES, "1");
+    setMaxAllocMb(conf, A1, 1024);
+    setMaxAllocVcores(conf, A1, 1);
 
     scheduler.init(conf);
     scheduler.start();
@@ -722,64 +627,6 @@ public class TestCapacityScheduler {
     Assert.assertEquals(1, maxAllocationForQueue.getVirtualCores());
   }
 
-
-  @Test
-  public void testRefreshQueues() throws Exception {
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
-        null, new RMContainerTokenSecretManager(conf),
-        new NMTokenSecretManagerInRM(conf),
-        new ClientToAMTokenSecretManagerInRM(), null);
-    setupQueueConfiguration(conf);
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, rmContext);
-    checkQueueStructureCapacities(cs);
-
-    conf.setCapacity(A, 80f);
-    conf.setCapacity(B, 20f);
-    cs.reinitialize(conf, mockContext);
-    checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f));
-    cs.stop();
-  }
-
-  private void checkApplicationResourceUsage(int expected,
-      Application application) {
-    Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
-  }
-
-  private void checkNodeResourceUsage(int expected, NodeManager node) {
-    Assert.assertEquals(expected, node.getUsed().getMemorySize());
-    node.checkResourceUsage();
-  }
-
-  /** Test that parseQueue throws an exception when two leaf queues have the
-   *  same name
- * @throws IOException
-   */
-  @Test(expected=IOException.class)
-  public void testParseQueue() throws IOException {
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    cs.init(conf);
-    cs.start();
-
-    conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} );
-    conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
-    conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
-
-    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
-      null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
-  }
-
   @Test
   public void testParseQueueWithAbsoluteResource() {
     String childQueue = "testQueue";
@@ -820,85 +667,6 @@ public class TestCapacityScheduler {
   }
 
   @Test
-  public void testReconnectedNode() throws Exception {
-    CapacitySchedulerConfiguration csConf =
-        new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(csConf);
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(csConf);
-    cs.start();
-    cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
-      null, null, new RMContainerTokenSecretManager(csConf),
-      new NMTokenSecretManagerInRM(csConf),
-      new ClientToAMTokenSecretManagerInRM(), null));
-
-    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
-    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
-
-    cs.handle(new NodeAddedSchedulerEvent(n1));
-    cs.handle(new NodeAddedSchedulerEvent(n2));
-
-    Assert.assertEquals(6 * GB, cs.getClusterResource().getMemorySize());
-
-    // reconnect n1 with downgraded memory
-    n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
-    cs.handle(new NodeRemovedSchedulerEvent(n1));
-    cs.handle(new NodeAddedSchedulerEvent(n1));
-
-    Assert.assertEquals(4 * GB, cs.getClusterResource().getMemorySize());
-    cs.stop();
-  }
-
-  @Test
-  public void testRefreshQueuesWithNewQueue() throws Exception {
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
-        null, new RMContainerTokenSecretManager(conf),
-        new NMTokenSecretManagerInRM(conf),
-        new ClientToAMTokenSecretManagerInRM(), null));
-    checkQueueStructureCapacities(cs);
-
-    // Add a new queue b4
-    final String b4 = B + ".b4";
-    final float b4Capacity = 10;
-    final float modifiedB3Capacity = B3_CAPACITY - b4Capacity;
-
-    try {
-      conf.setCapacity(A, 80f);
-      conf.setCapacity(B, 20f);
-      conf.setQueues(B, new String[]{"b1", "b2", "b3", "b4"});
-      conf.setCapacity(B1, B1_CAPACITY);
-      conf.setCapacity(B2, B2_CAPACITY);
-      conf.setCapacity(B3, modifiedB3Capacity);
-      conf.setCapacity(b4, b4Capacity);
-      cs.reinitialize(conf, mockContext);
-
-      final float capA = 80f / 100.0f;
-      final float capB = 20f / 100.0f;
-      Map<String, ExpectedCapacities> expectedCapacities = getDefaultCapacities(capA, capB);
-      expectedCapacities.put(B3, new ExpectedCapacities(modifiedB3Capacity / 100.0f, capB));
-      expectedCapacities.put(b4, new ExpectedCapacities(b4Capacity / 100.0f, capB));
-      checkQueueStructureCapacities(cs, expectedCapacities);
-
-      // Verify parent for B4
-      CSQueue rootQueue = cs.getRootQueue();
-      CSQueue queueB = findQueue(rootQueue, B);
-      CSQueue queueB4 = findQueue(queueB, b4);
-
-      assertEquals(queueB, queueB4.getParent());
-    } finally {
-      cs.stop();
-    }
-  }
-  @Test
   public void testCapacitySchedulerInfo() throws Exception {
     QueueInfo queueInfo = resourceManager.getResourceScheduler().getQueueInfo("a", true, true);
     Assert.assertEquals("Queue Name should be a", "a",
@@ -928,36 +696,6 @@ public class TestCapacityScheduler {
   }
 
   @Test
-  public void testBlackListNodes() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(conf);
-    rm.start();
-    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-
-    String host = "127.0.0.1";
-    RMNode node =
-        MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
-    cs.handle(new NodeAddedSchedulerEvent(node));
-
-    ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user");
-
-    // Verify the blacklist can be updated independent of requesting containers
-    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
-        Collections.<ContainerId>emptyList(),
-        Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
-    Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
-        .isPlaceBlacklisted(host));
-    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
-        Collections.<ContainerId>emptyList(), null,
-        Collections.singletonList(host), NULL_UPDATE_REQUESTS);
-    Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
-        .isPlaceBlacklisted(host));
-    rm.stop();
-  }
-
-  @Test
   public void testAllocateReorder() throws Exception {
 
     //Confirm that allocation (resource request) alone will trigger a change in
@@ -1163,53 +901,6 @@ public class TestCapacityScheduler {
   }
 
   @Test
-  public void testGetAppsInQueue() throws Exception {
-    Application application_0 = new Application("user_0", "a1", resourceManager);
-    application_0.submit();
-
-    Application application_1 = new Application("user_0", "a2", resourceManager);
-    application_1.submit();
-
-    Application application_2 = new Application("user_0", "b2", resourceManager);
-    application_2.submit();
-
-    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
-    assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
-    assertEquals(2, appsInA.size());
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
-    assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
-    assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
-    assertEquals(3, appsInRoot.size());
-
-    Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
-  }
-
-  @Test
-  public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-      ResourceScheduler.class);
-    MockRM rm = new MockRM(conf);
-    @SuppressWarnings("unchecked")
-    AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
-        (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
-          .getResourceScheduler();
-    SchedulerApplication<SchedulerApplicationAttempt> app =
-        TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
-          cs.getSchedulerApplications(), cs, "a1");
-    Assert.assertEquals("a1", app.getQueue().getQueueName());
-  }
-
-  @Test
   public void testAsyncScheduling() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
@@ -1381,37 +1072,6 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
-  @Test
-  public void testNumClusterNodes() throws Exception {
-    YarnConfiguration conf = new YarnConfiguration();
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(conf);
-    RMContext rmContext = TestUtils.getMockRMContext();
-    cs.setRMContext(rmContext);
-    CapacitySchedulerConfiguration csConf =
-        new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(csConf);
-    cs.init(csConf);
-    cs.start();
-    assertEquals(0, cs.getNumClusterNodes());
-
-    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
-    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
-    cs.handle(new NodeAddedSchedulerEvent(n1));
-    cs.handle(new NodeAddedSchedulerEvent(n2));
-    assertEquals(2, cs.getNumClusterNodes());
-
-    cs.handle(new NodeRemovedSchedulerEvent(n1));
-    assertEquals(1, cs.getNumClusterNodes());
-    cs.handle(new NodeAddedSchedulerEvent(n1));
-    assertEquals(2, cs.getNumClusterNodes());
-    cs.handle(new NodeRemovedSchedulerEvent(n2));
-    cs.handle(new NodeRemovedSchedulerEvent(n1));
-    assertEquals(0, cs.getNumClusterNodes());
-
-    cs.stop();
-  }
-
   @Test(timeout = 120000)
   public void testPreemptionInfo() throws Exception {
     Configuration conf = new Configuration();
@@ -1553,1790 +1213,56 @@ public class TestCapacityScheduler {
     Assert.assertTrue(containers.size() == 1);
   }
 
-  private MockRM setUpMove() {
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    return setUpMove(conf);
-  }
-
-  private MockRM setUpMove(Configuration config) {
-    CapacitySchedulerConfiguration conf =
-        new CapacitySchedulerConfiguration(config);
-    setupQueueConfiguration(conf);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(conf);
-    rm.start();
-    return rm;
-  }
-
   @Test
-  public void testAppSubmission() throws Exception {
+  public void testPreemptionDisabled() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
     setupQueueConfiguration(conf);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    conf.setQueues(A, new String[] {"a1", "a2", "b"});
-    conf.setCapacity(A1, 20);
-    conf.setCapacity("root.a.b", 10);
-    MockRM rm = new MockRM(conf);
-    rm.start();
-
-    RMApp noParentQueueApp = submitAppAndWaitForState(rm, "q", RMAppState.FAILED);
-    Assert.assertEquals(RMAppState.FAILED, noParentQueueApp.getState());
-
-    RMApp ambiguousQueueApp = submitAppAndWaitForState(rm, "b", RMAppState.FAILED);
-    Assert.assertEquals(RMAppState.FAILED, ambiguousQueueApp.getState());
-
-    RMApp emptyPartQueueApp = submitAppAndWaitForState(rm, "root..a1", RMAppState.FAILED);
-    Assert.assertEquals(RMAppState.FAILED, emptyPartQueueApp.getState());
-
-    RMApp failedAutoQueue = submitAppAndWaitForState(rm, "root.a.b.c.d", RMAppState.FAILED);
-    Assert.assertEquals(RMAppState.FAILED, failedAutoQueue.getState());
-  }
-
-  private RMApp submitAppAndWaitForState(MockRM rm, String b, RMAppState state) throws Exception {
-    MockRMAppSubmissionData ambiguousQueueAppData =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withWaitForAppAcceptedState(false)
-            .withAppName("app")
-            .withUser("user")
-            .withAcls(null)
-            .withQueue(b)
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app1 = MockRMAppSubmitter.submit(rm, ambiguousQueueAppData);
-    rm.waitForState(app1.getApplicationId(), state);
-    return app1;
-  }
-
-  @Test
-  public void testMoveAppBasic() throws Exception {
-    MockRM rm = setUpMove();
-    AbstractYarnScheduler scheduler =
-        (AbstractYarnScheduler) rm.getResourceScheduler();
-    QueueMetrics metrics = scheduler.getRootQueueMetrics();
-    Assert.assertEquals(0, metrics.getAppsPending());
-    // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-    String queue =
-        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("a1", queue);
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    assertEquals(1, metrics.getAppsPending());
-
-    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
-    assertTrue(appsInB1.isEmpty());
-
-    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.isEmpty());
-
-    // now move the app
-    scheduler.moveApplication(app.getApplicationId(), "b1");
-
-    // check postconditions
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(1, appsInB1.size());
-    queue =
-        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("b1", queue);
-
-    appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.contains(appAttemptId));
-    assertEquals(1, appsInB.size());
-
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    assertEquals(1, metrics.getAppsPending());
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    assertTrue(appsInA1.isEmpty());
-
-    appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.isEmpty());
-
-    rm.stop();
-  }
-
-  @Test
-  public void testMoveAppPendingMetrics() throws Exception {
-    MockRM rm = setUpMove();
-    AbstractYarnScheduler scheduler =
-        (AbstractYarnScheduler) rm.getResourceScheduler();
-    QueueMetrics metrics = scheduler.getRootQueueMetrics();
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
-
-    assertEquals(0, appsInA1.size());
-    assertEquals(0, appsInB1.size());
-    Assert.assertEquals(0, metrics.getAppsPending());
-
-    // submit two apps in a1
-    RMApp app1 = MockRMAppSubmitter.submit(rm,
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .build());
-    RMApp app2 = MockRMAppSubmitter.submit(rm,
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-2")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .build());
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(2, appsInA1.size());
-    assertEquals(0, appsInB1.size());
-    assertEquals(2, metrics.getAppsPending());
-
-    // submit one app in b1
-    RMApp app3 = MockRMAppSubmitter.submit(rm,
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-2")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("b1")
-            .build());
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(2, appsInA1.size());
-    assertEquals(1, appsInB1.size());
-    assertEquals(3, metrics.getAppsPending());
-
-    // now move the app1 from a1 to b1
-    scheduler.moveApplication(app1.getApplicationId(), "b1");
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(1, appsInA1.size());
-    assertEquals(2, appsInB1.size());
-    assertEquals(3, metrics.getAppsPending());
-
-    // now move the app2 from a1 to b1
-    scheduler.moveApplication(app2.getApplicationId(), "b1");
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(0, appsInA1.size());
-    assertEquals(3, appsInB1.size());
-    assertEquals(3, metrics.getAppsPending());
-
-    // now move the app3 from b1 to a1
-    scheduler.moveApplication(app3.getApplicationId(), "a1");
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
 
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(1, appsInA1.size());
-    assertEquals(2, appsInB1.size());
-    assertEquals(3, metrics.getAppsPending());
-    rm.stop();
-  }
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueB = findQueue(rootQueue, B);
+    CSQueue queueB2 = findQueue(queueB, B2);
 
-  @Test
-  public void testMoveAppSameParent() throws Exception {
-    MockRM rm = setUpMove();
-    AbstractYarnScheduler scheduler =
-        (AbstractYarnScheduler) rm.getResourceScheduler();
-
-    // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-    String queue =
-        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("a1", queue);
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2");
-    assertTrue(appsInA2.isEmpty());
-
-    // now move the app
-    scheduler.moveApplication(app.getApplicationId(), "a2");
-
-    // check postconditions
-    appsInA2 = scheduler.getAppsInQueue("a2");
-    assertEquals(1, appsInA2.size());
-    queue =
-        scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("a2", queue);
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    assertTrue(appsInA1.isEmpty());
-
-    appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    rm.stop();
-  }
-
-  @Test
-  public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
-
-    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
-    NodeStatus mockNodeStatus = createMockNodeStatus();
-
-    // Register node1
-    String host_0 = "host_0";
-    NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-        Resources.createResource(4 * GB, 1), mockNodeStatus);
-
-    // Register node2
-    String host_1 = "host_1";
-    NodeManager nm_1 =
-        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-        Resources.createResource(2 * GB, 1), mockNodeStatus);
-
-    // ResourceRequest priorities
-    Priority priority_0 = Priority.newInstance(0);
-    Priority priority_1 = Priority.newInstance(1);
-
-    // Submit application_0
-    Application application_0 =
-        new Application("user_0", "a1", resourceManager);
-    application_0.submit(); // app + app attempt event sent to scheduler
-
-    application_0.addNodeManager(host_0, 1234, nm_0);
-    application_0.addNodeManager(host_1, 1234, nm_1);
-
-    Resource capability_0_0 = Resources.createResource(1 * GB, 1);
-    application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
-    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
-    application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
-    Task task_0_0 =
-        new Task(application_0, priority_1, new String[] { host_0, host_1 });
-    application_0.addTask(task_0_0);
-
-    // Submit application_1
-    Application application_1 =
-        new Application("user_1", "b2", resourceManager);
-    application_1.submit(); // app + app attempt event sent to scheduler
-
-    application_1.addNodeManager(host_0, 1234, nm_0);
-    application_1.addNodeManager(host_1, 1234, nm_1);
-
-    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
-    application_1.addResourceRequestSpec(priority_1, capability_1_0);
-
-    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
-    application_1.addResourceRequestSpec(priority_0, capability_1_1);
-
-    Task task_1_0 =
-        new Task(application_1, priority_1, new String[] { host_0, host_1 });
-    application_1.addTask(task_1_0);
-
-    // Send resource requests to the scheduler
-    application_0.schedule(); // allocate
-    application_1.schedule(); // allocate
-
-    // task_0_0 task_1_0 allocated, used=2G
-    nodeUpdate(nm_0);
-
-    // nothing allocated
-    nodeUpdate(nm_1);
-
-    // Get allocations from the scheduler
-    application_0.schedule(); // task_0_0
-    checkApplicationResourceUsage(1 * GB, application_0);
-
-    application_1.schedule(); // task_1_0
-    checkApplicationResourceUsage(1 * GB, application_1);
-
-    checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G
-                                          // available
-    checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available
-
-    // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
-    // total cap)
-    scheduler.moveApplication(application_0.getApplicationId(), "b1");
-
-    // 2GB 1C
-    Task task_1_1 =
-        new Task(application_1, priority_0,
-            new String[] { ResourceRequest.ANY });
-    application_1.addTask(task_1_1);
-
-    application_1.schedule();
-
-    // 2GB 1C
-    Task task_0_1 =
-        new Task(application_0, priority_0, new String[] { host_0, host_1 });
-    application_0.addTask(task_0_1);
-
-    application_0.schedule();
-
-    // prev 2G used free 2G
-    nodeUpdate(nm_0);
-
-    // prev 0G used free 2G
-    nodeUpdate(nm_1);
-
-    // Get allocations from the scheduler
-    application_1.schedule();
-    checkApplicationResourceUsage(3 * GB, application_1);
-
-    // Get allocations from the scheduler
-    application_0.schedule();
-    checkApplicationResourceUsage(3 * GB, application_0);
-
-    checkNodeResourceUsage(4 * GB, nm_0);
-    checkNodeResourceUsage(2 * GB, nm_1);
-
-  }
-
-  @Test
-  public void testMoveAppSuccess() throws Exception {
-
-    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
-    NodeStatus mockNodeStatus = createMockNodeStatus();
-
-    // Register node1
-    String host_0 = "host_0";
-    NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-        Resources.createResource(5 * GB, 1), mockNodeStatus);
-
-    // Register node2
-    String host_1 = "host_1";
-    NodeManager nm_1 =
-        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-        Resources.createResource(5 * GB, 1), mockNodeStatus);
-
-    // ResourceRequest priorities
-    Priority priority_0 = Priority.newInstance(0);
-    Priority priority_1 = Priority.newInstance(1);
-
-    // Submit application_0
-    Application application_0 =
-        new Application("user_0", "a1", resourceManager);
-    application_0.submit(); // app + app attempt event sent to scheduler
-
-    application_0.addNodeManager(host_0, 1234, nm_0);
-    application_0.addNodeManager(host_1, 1234, nm_1);
-
-    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
-    application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
-    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
-    application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
-    Task task_0_0 =
-        new Task(application_0, priority_1, new String[] { host_0, host_1 });
-    application_0.addTask(task_0_0);
-
-    // Submit application_1
-    Application application_1 =
-        new Application("user_1", "b2", resourceManager);
-    application_1.submit(); // app + app attempt event sent to scheduler
-
-    application_1.addNodeManager(host_0, 1234, nm_0);
-    application_1.addNodeManager(host_1, 1234, nm_1);
-
-    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
-    application_1.addResourceRequestSpec(priority_1, capability_1_0);
-
-    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
-    application_1.addResourceRequestSpec(priority_0, capability_1_1);
-
-    Task task_1_0 =
-        new Task(application_1, priority_1, new String[] { host_0, host_1 });
-    application_1.addTask(task_1_0);
-
-    // Send resource requests to the scheduler
-    application_0.schedule(); // allocate
-    application_1.schedule(); // allocate
-
-    // b2 can only run 1 app at a time
-    scheduler.moveApplication(application_0.getApplicationId(), "b2");
-
-    nodeUpdate(nm_0);
-
-    nodeUpdate(nm_1);
-
-    // Get allocations from the scheduler
-    application_0.schedule(); // task_0_0
-    checkApplicationResourceUsage(0 * GB, application_0);
-
-    application_1.schedule(); // task_1_0
-    checkApplicationResourceUsage(1 * GB, application_1);
-
-    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
-    // not scheduled
-    checkNodeResourceUsage(1 * GB, nm_0);
-    checkNodeResourceUsage(0 * GB, nm_1);
-
-    // lets move application_0 to a queue where it can run
-    scheduler.moveApplication(application_0.getApplicationId(), "a2");
-    application_0.schedule();
-
-    nodeUpdate(nm_1);
-
-    // Get allocations from the scheduler
-    application_0.schedule(); // task_0_0
-    checkApplicationResourceUsage(3 * GB, application_0);
-
-    checkNodeResourceUsage(1 * GB, nm_0);
-    checkNodeResourceUsage(3 * GB, nm_1);
-
-  }
-
-  @Test(expected = YarnException.class)
-  public void testMoveAppViolateQueueState() throws Exception {
-    resourceManager = new ResourceManager() {
-       @Override
-        protected RMNodeLabelsManager createNodeLabelManager() {
-          RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
-          mgr.init(getConfig());
-          return mgr;
-        }
-    };
-    CapacitySchedulerConfiguration csConf =
-        new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(csConf);
-    StringBuilder qState = new StringBuilder();
-    qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
-        .append(CapacitySchedulerConfiguration.DOT)
-        .append(CapacitySchedulerConfiguration.STATE);
-    csConf.set(qState.toString(), QueueState.STOPPED.name());
-    YarnConfiguration conf = new YarnConfiguration(csConf);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    resourceManager.init(conf);
-    resourceManager.getRMContext().getContainerTokenSecretManager()
-        .rollMasterKey();
-    resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
-    ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
-    mockContext = mock(RMContext.class);
-    when(mockContext.getConfigurationProvider()).thenReturn(
-        new LocalConfigurationProvider());
-
-    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
-    NodeStatus mockNodeStatus = createMockNodeStatus();
-
-    // Register node1
-    String host_0 = "host_0";
-    NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-        Resources.createResource(6 * GB, 1), mockNodeStatus);
-
-    // ResourceRequest priorities
-    Priority priority_0 = Priority.newInstance(0);
-    Priority priority_1 = Priority.newInstance(1);
-
-    // Submit application_0
-    Application application_0 =
-        new Application("user_0", "a1", resourceManager);
-    application_0.submit(); // app + app attempt event sent to scheduler
-
-    application_0.addNodeManager(host_0, 1234, nm_0);
-
-    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
-    application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
-    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
-    application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
-    Task task_0_0 =
-        new Task(application_0, priority_1, new String[] { host_0 });
-    application_0.addTask(task_0_0);
-
-    // Send resource requests to the scheduler
-    application_0.schedule(); // allocate
-
-    // task_0_0 allocated
-    nodeUpdate(nm_0);
-
-    // Get allocations from the scheduler
-    application_0.schedule(); // task_0_0
-    checkApplicationResourceUsage(3 * GB, application_0);
-
-    checkNodeResourceUsage(3 * GB, nm_0);
-    // b2 queue contains 3GB consumption app,
-    // add another 3GB will hit max capacity limit on queue b
-    scheduler.moveApplication(application_0.getApplicationId(), "b1");
-
-  }
-
-  @Test
-  public void testMoveAppQueueMetricsCheck() throws Exception {
-    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-
-    NodeStatus mockNodeStatus = createMockNodeStatus();
-
-    // Register node1
-    String host_0 = "host_0";
-    NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-        Resources.createResource(5 * GB, 1), mockNodeStatus);
-
-    // Register node2
-    String host_1 = "host_1";
-    NodeManager nm_1 =
-        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-        Resources.createResource(5 * GB, 1), mockNodeStatus);
-
-    // ResourceRequest priorities
-    Priority priority_0 = Priority.newInstance(0);
-    Priority priority_1 = Priority.newInstance(1);
-
-    // Submit application_0
-    Application application_0 =
-        new Application("user_0", "a1", resourceManager);
-    application_0.submit(); // app + app attempt event sent to scheduler
-
-    application_0.addNodeManager(host_0, 1234, nm_0);
-    application_0.addNodeManager(host_1, 1234, nm_1);
-
-    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
-    application_0.addResourceRequestSpec(priority_1, capability_0_0);
-
-    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
-    application_0.addResourceRequestSpec(priority_0, capability_0_1);
-
-    Task task_0_0 =
-        new Task(application_0, priority_1, new String[] { host_0, host_1 });
-    application_0.addTask(task_0_0);
-
-    // Submit application_1
-    Application application_1 =
-        new Application("user_1", "b2", resourceManager);
-    application_1.submit(); // app + app attempt event sent to scheduler
-
-    application_1.addNodeManager(host_0, 1234, nm_0);
-    application_1.addNodeManager(host_1, 1234, nm_1);
-
-    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
-    application_1.addResourceRequestSpec(priority_1, capability_1_0);
-
-    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
-    application_1.addResourceRequestSpec(priority_0, capability_1_1);
-
-    Task task_1_0 =
-        new Task(application_1, priority_1, new String[] { host_0, host_1 });
-    application_1.addTask(task_1_0);
-
-    // Send resource requests to the scheduler
-    application_0.schedule(); // allocate
-    application_1.schedule(); // allocate
-
-    nodeUpdate(nm_0);
-
-    nodeUpdate(nm_1);
-
-    CapacityScheduler cs =
-        (CapacityScheduler) resourceManager.getResourceScheduler();
-    CSQueue origRootQ = cs.getRootQueue();
-    CapacitySchedulerInfo oldInfo =
-        new CapacitySchedulerInfo(origRootQ, cs);
-    int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
-    int origNumAppsRoot = origRootQ.getNumApplications();
-
-    scheduler.moveApplication(application_0.getApplicationId(), "a2");
-
-    CSQueue newRootQ = cs.getRootQueue();
-    int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
-    int newNumAppsRoot = newRootQ.getNumApplications();
-    CapacitySchedulerInfo newInfo =
-        new CapacitySchedulerInfo(newRootQ, cs);
-    CapacitySchedulerLeafQueueInfo origOldA1 =
-        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
-    CapacitySchedulerLeafQueueInfo origNewA1 =
-        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
-    CapacitySchedulerLeafQueueInfo targetOldA2 =
-        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
-    CapacitySchedulerLeafQueueInfo targetNewA2 =
-        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
-    // originally submitted here
-    assertEquals(1, origOldA1.getNumApplications());
-    assertEquals(1, origNumAppsA);
-    assertEquals(2, origNumAppsRoot);
-    // after the move
-    assertEquals(0, origNewA1.getNumApplications());
-    assertEquals(1, newNumAppsA);
-    assertEquals(2, newNumAppsRoot);
-    // original consumption on a1
-    assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemorySize());
-    assertEquals(1, origOldA1.getResourcesUsed().getvCores());
-    assertEquals(0, origNewA1.getResourcesUsed().getMemorySize()); // after the move
-    assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
-    // app moved here with live containers
-    assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemorySize());
-    assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
-    // it was empty before the move
-    assertEquals(0, targetOldA2.getNumApplications());
-    assertEquals(0, targetOldA2.getResourcesUsed().getMemorySize());
-    assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
-    // after the app moved here
-    assertEquals(1, targetNewA2.getNumApplications());
-    // 1 container on original queue before move
-    assertEquals(1, origOldA1.getNumContainers());
-    // after the move the resource released
-    assertEquals(0, origNewA1.getNumContainers());
-    // and moved to the new queue
-    assertEquals(1, targetNewA2.getNumContainers());
-    // which originally didn't have any
-    assertEquals(0, targetOldA2.getNumContainers());
-    // 1 user with 3GB
-    assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
-        .getResourcesUsed().getMemorySize());
-    // 1 user with 1 core
-    assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
-        .getResourcesUsed().getvCores());
-    // user ha no more running app in the orig queue
-    assertEquals(0, origNewA1.getUsers().getUsersList().size());
-    // 1 user with 3GB
-    assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
-        .getResourcesUsed().getMemorySize());
-    // 1 user with 1 core
-    assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
-        .getResourcesUsed().getvCores());
-
-    // Get allocations from the scheduler
-    application_0.schedule(); // task_0_0
-    checkApplicationResourceUsage(3 * GB, application_0);
-
-    application_1.schedule(); // task_1_0
-    checkApplicationResourceUsage(1 * GB, application_1);
-
-    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
-    // not scheduled
-    checkNodeResourceUsage(4 * GB, nm_0);
-    checkNodeResourceUsage(0 * GB, nm_1);
-
-  }
-
-  private int getNumAppsInQueue(String name, List<CSQueue> queues) {
-    for (CSQueue queue : queues) {
-      if (queue.getQueueShortName().equals(name)) {
-        return queue.getNumApplications();
-      }
-    }
-    return -1;
-  }
-
-  private CapacitySchedulerQueueInfo getQueueInfo(String name,
-      CapacitySchedulerQueueInfoList info) {
-    if (info != null) {
-      for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
-        if (queueInfo.getQueueName().equals(name)) {
-          return queueInfo;
-        } else {
-          CapacitySchedulerQueueInfo result =
-              getQueueInfo(name, queueInfo.getQueues());
-          if (result == null) {
-            continue;
-          }
-          return result;
-        }
-      }
-    }
-    return null;
-  }
-
-  @Test
-  public void testMoveAllApps() throws Exception {
-    MockRM rm = setUpMove();
-    AbstractYarnScheduler scheduler =
-        (AbstractYarnScheduler) rm.getResourceScheduler();
-
-    // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-    String queue =
-        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("a1", queue);
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
-    assertTrue(appsInB1.isEmpty());
-
-    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.isEmpty());
-
-    // now move the app
-    scheduler.moveAllApps("a1", "b1");
-
-    // check postconditions
-    Thread.sleep(1000);
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(1, appsInB1.size());
-    queue =
-        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("b1", queue);
-
-    appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.contains(appAttemptId));
-    assertEquals(1, appsInB.size());
-
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    assertTrue(appsInA1.isEmpty());
-
-    appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.isEmpty());
-
-    rm.stop();
-  }
-
-  @Test
-  public void testMoveAllAppsInvalidDestination() throws Exception {
-    MockRM rm = setUpMove();
-    YarnScheduler scheduler = rm.getResourceScheduler();
-
-    // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
-    assertTrue(appsInB1.isEmpty());
-
-    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.isEmpty());
-
-    // now move the app
-    try {
-      scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
-      Assert.fail();
-    } catch (YarnException e) {
-      // expected
-    }
-
-    // check postconditions, app should still be in a1
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertTrue(appsInB1.isEmpty());
-
-    appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.isEmpty());
-
-    rm.stop();
-  }
-
-  @Test
-  public void testMoveAllAppsInvalidSource() throws Exception {
-    MockRM rm = setUpMove();
-    YarnScheduler scheduler = rm.getResourceScheduler();
-
-    // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
-    assertTrue(appsInB1.isEmpty());
-
-    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.isEmpty());
-
-    // now move the app
-    try {
-      scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
-      Assert.fail();
-    } catch (YarnException e) {
-      // expected
-    }
-
-    // check postconditions, app should still be in a1
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertTrue(appsInB1.isEmpty());
-
-    appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.isEmpty());
-
-    rm.stop();
-  }
-
-  @Test(timeout = 60000)
-  public void testMoveAttemptNotAdded() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    MockRM rm = new MockRM(getCapacityConfiguration(conf));
-    rm.start();
-    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-
-    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
-    ApplicationAttemptId appAttemptId =
-        BuilderUtils.newApplicationAttemptId(appId, 1);
-
-    RMAppAttemptMetrics attemptMetric =
-        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
-    RMAppImpl app = mock(RMAppImpl.class);
-    when(app.getApplicationId()).thenReturn(appId);
-    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
-    Container container = mock(Container.class);
-    when(attempt.getMasterContainer()).thenReturn(container);
-    ApplicationSubmissionContext submissionContext =
-        mock(ApplicationSubmissionContext.class);
-    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
-    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
-    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
-    when(app.getCurrentAppAttempt()).thenReturn(attempt);
-
-    rm.getRMContext().getRMApps().put(appId, app);
-
-    SchedulerEvent addAppEvent =
-        new AppAddedSchedulerEvent(appId, "a1", "user");
-    try {
-      cs.moveApplication(appId, "b1");
-      fail("Move should throw exception app not available");
-    } catch (YarnException e) {
-      assertEquals("App to be moved application_100_0001 not found.",
-          e.getMessage());
-    }
-    cs.handle(addAppEvent);
-    cs.moveApplication(appId, "b1");
-    SchedulerEvent addAttemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    cs.handle(addAttemptEvent);
-    CSQueue rootQ = cs.getRootQueue();
-    CSQueue queueB = cs.getQueue("b");
-    CSQueue queueA = cs.getQueue("a");
-    CSQueue queueA1 = cs.getQueue("a1");
-    CSQueue queueB1 = cs.getQueue("b1");
-    Assert.assertEquals(1, rootQ.getNumApplications());
-    Assert.assertEquals(0, queueA.getNumApplications());
-    Assert.assertEquals(1, queueB.getNumApplications());
-    Assert.assertEquals(0, queueA1.getNumApplications());
-    Assert.assertEquals(1, queueB1.getNumApplications());
-
-    rm.close();
-  }
-
-  @Test
-  public void testRemoveAttemptMoveAdded() throws Exception {
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        CapacityScheduler.class);
-    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-    // Create Mock RM
-    MockRM rm = new MockRM(getCapacityConfiguration(conf));
-    CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
-    // add node
-    Resource newResource = Resource.newInstance(4 * GB, 1);
-    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
-    SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
-    sch.handle(addNode);
-
-    ApplicationAttemptId appAttemptId = appHelper(rm, sch, 100, 1, "a1", "user");
-
-    // get Queues
-    CSQueue queueA1 = sch.getQueue("a1");
-    CSQueue queueB = sch.getQueue("b");
-    CSQueue queueB1 = sch.getQueue("b1");
-
-    // add Running rm container and simulate live containers to a1
-    ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
-    RMContainerImpl rmContainer = mock(RMContainerImpl.class);
-    when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
-    Container container2 = mock(Container.class);
-    when(rmContainer.getContainer()).thenReturn(container2);
-    Resource resource = Resource.newInstance(1024, 1);
-    when(container2.getResource()).thenReturn(resource);
-    when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
-    when(container2.getNodeId()).thenReturn(node.getNodeID());
-    when(container2.getId()).thenReturn(newContainerId);
-    when(rmContainer.getNodeLabelExpression())
-        .thenReturn(RMNodeLabelsManager.NO_LABEL);
-    when(rmContainer.getContainerId()).thenReturn(newContainerId);
-    sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
-        .put(newContainerId, rmContainer);
-    QueueMetrics queueA1M = queueA1.getMetrics();
-    queueA1M.incrPendingResources(rmContainer.getNodeLabelExpression(),
-        "user1", 1, resource);
-    queueA1M.allocateResources(rmContainer.getNodeLabelExpression(),
-        "user1", resource);
-    // remove attempt
-    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
-        RMAppAttemptState.KILLED, true));
-    // Move application to queue b1
-    sch.moveApplication(appAttemptId.getApplicationId(), "b1");
-    // Check queue metrics after move
-    Assert.assertEquals(0, queueA1.getNumApplications());
-    Assert.assertEquals(1, queueB.getNumApplications());
-    Assert.assertEquals(0, queueB1.getNumApplications());
-
-    // Release attempt add event
-    ApplicationAttemptId appAttemptId2 =
-        BuilderUtils.newApplicationAttemptId(appAttemptId.getApplicationId(), 2);
-    SchedulerEvent addAttemptEvent2 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
-    sch.handle(addAttemptEvent2);
-
-    // Check metrics after attempt added
-    Assert.assertEquals(0, queueA1.getNumApplications());
-    Assert.assertEquals(1, queueB.getNumApplications());
-    Assert.assertEquals(1, queueB1.getNumApplications());
-
-
-    QueueMetrics queueB1M = queueB1.getMetrics();
-    QueueMetrics queueBM = queueB.getMetrics();
-    // Verify allocation MB of current state
-    Assert.assertEquals(0, queueA1M.getAllocatedMB());
-    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
-    Assert.assertEquals(1024, queueB1M.getAllocatedMB());
-    Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores());
-
-    // remove attempt
-    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
-        RMAppAttemptState.FINISHED, false));
-
-    Assert.assertEquals(0, queueA1M.getAllocatedMB());
-    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
-    Assert.assertEquals(0, queueB1M.getAllocatedMB());
-    Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores());
-
-    verifyQueueMetrics(queueB1M);
-    verifyQueueMetrics(queueBM);
-    // Verify queue A1 metrics
-    verifyQueueMetrics(queueA1M);
-    rm.close();
-  }
-
-  private void verifyQueueMetrics(QueueMetrics queue) {
-    Assert.assertEquals(0, queue.getPendingMB());
-    Assert.assertEquals(0, queue.getActiveUsers());
-    Assert.assertEquals(0, queue.getActiveApps());
-    Assert.assertEquals(0, queue.getAppsPending());
-    Assert.assertEquals(0, queue.getAppsRunning());
-    Assert.assertEquals(0, queue.getAllocatedMB());
-    Assert.assertEquals(0, queue.getAllocatedVirtualCores());
-  }
-
-  private Configuration getCapacityConfiguration(Configuration config) {
-    CapacitySchedulerConfiguration conf =
-        new CapacitySchedulerConfiguration(config);
-
-    // Define top-level queues
-    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
-        new String[] {"a", "b"});
-    conf.setCapacity(A, 50);
-    conf.setCapacity(B, 50);
-    conf.setQueues(A, new String[] {"a1", "a2"});
-    conf.setCapacity(A1, 50);
-    conf.setCapacity(A2, 50);
-    conf.setQueues(B, new String[] {"b1"});
-    conf.setCapacity(B1, 100);
-    return conf;
-  }
-
-  @Test
-  public void testKillAllAppsInQueue() throws Exception {
-    MockRM rm = setUpMove();
-    AbstractYarnScheduler scheduler =
-        (AbstractYarnScheduler) rm.getResourceScheduler();
-
-    // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-    String queue =
-        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("a1", queue);
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    // now kill the app
-    scheduler.killAllAppsInQueue("a1");
-
-    // check postconditions
-    rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
-    rm.waitForAppRemovedFromScheduler(app.getApplicationId());
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.isEmpty());
-
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    assertTrue(appsInA1.isEmpty());
-
-    appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.isEmpty());
-
-    rm.stop();
-  }
-
-  @Test
-  public void testKillAllAppsInvalidSource() throws Exception {
-    MockRM rm = setUpMove();
-    YarnScheduler scheduler = rm.getResourceScheduler();
-
-    // submit an app
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("user_0")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data);
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    // now kill the app
-    try {
-      scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
-      Assert.fail();
-    } catch (YarnException e) {
-      // expected
-    }
-
-    // check postconditions, app should still be in a1
-    appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(1, appsInA1.size());
-
-    appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(1, appsInA.size());
-
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(1, appsInRoot.size());
-
-    rm.stop();
-  }
-
-  // Test to ensure that we don't carry out reservation on nodes
-  // that have no CPU available when using the DominantResourceCalculator
-  @Test(timeout = 30000)
-  public void testAppReservationWithDominantResourceCalculator() throws Exception {
-    CapacitySchedulerConfiguration csconf =
-        new CapacitySchedulerConfiguration();
-    csconf.setResourceComparator(DominantResourceCalculator.class);
-
-    YarnConfiguration conf = new YarnConfiguration(csconf);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-      ResourceScheduler.class);
-
-    MockRM rm = new MockRM(conf);
-    rm.start();
-
-    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);
-
-    // register extra nodes to bump up cluster resource
-    MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
-    rm.registerNode("127.0.0.1:1236", 10 * GB, 4);
-
-    RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm);
-    // kick the scheduling
-    nm1.nodeHeartbeat(true);
-    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
-    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
-    am1.registerAppAttempt();
-    SchedulerNodeReport report_nm1 =
-        rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-
-    // check node report
-    Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemorySize());
-    Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemorySize());
-
-    // add request for containers
-    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1);
-    am1.schedule(); // send the request
-
-    // kick the scheduler, container reservation should not happen
-    nm1.nodeHeartbeat(true);
-    Thread.sleep(1000);
-    AllocateResponse allocResponse = am1.schedule();
-    ApplicationResourceUsageReport report =
-        rm.getResourceScheduler().getAppResourceUsageReport(
-          attempt1.getAppAttemptId());
-    Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
-    Assert.assertEquals(0, report.getNumReservedContainers());
-
-    // container should get allocated on this node
-    nm2.nodeHeartbeat(true);
-
-    while (allocResponse.getAllocatedContainers().size() == 0) {
-      Thread.sleep(100);
-      allocResponse = am1.schedule();
-    }
-    report =
-        rm.getResourceScheduler().getAppResourceUsageReport(
-          attempt1.getAppAttemptId());
-    Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
-    Assert.assertEquals(0, report.getNumReservedContainers());
-    rm.stop();
-  }
-
-  @Test
-  public void testPreemptionDisabled() throws Exception {
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
-    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
-        null, new RMContainerTokenSecretManager(conf),
-        new NMTokenSecretManagerInRM(conf),
-        new ClientToAMTokenSecretManagerInRM(), null);
-    setupQueueConfiguration(conf);
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, rmContext);
-
-    CSQueue rootQueue = cs.getRootQueue();
-    CSQueue queueB = findQueue(rootQueue, B);
-    CSQueue queueB2 = findQueue(queueB, B2);
-
-    // When preemption turned on for the whole system
-    // (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other 
-    // preemption properties set, queue root.b.b2 should be preemptable.
-    assertFalse("queue " + B2 + " should default to preemptable",
-               queueB2.getPreemptionDisabled());
+    // When preemption turned on for the whole system
+    // (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other
+    // preemption properties set, queue root.b.b2 should be preemptable.
+    assertFalse("queue " + B2 + " should default to preemptable",
+               queueB2.getPreemptionDisabled());
 
     // Disable preemption at the root queue level.
     // The preemption property should be inherited from root all the
-    // way down so that root.b.b2 should NOT be preemptable.
-    conf.setPreemptionDisabled(rootQueue.getQueuePath(), true);
-    cs.reinitialize(conf, rmContext);
-    assertTrue(
-        "queue " + B2 + " should have inherited non-preemptability from root",
-        queueB2.getPreemptionDisabled());
-
-    // Enable preemption for root (grandparent) but disable for root.b (parent).
-    // root.b.b2 should inherit property from parent and NOT be preemptable
-    conf.setPreemptionDisabled(rootQueue.getQueuePath(), false);
-    conf.setPreemptionDisabled(queueB.getQueuePath(), true);
-    cs.reinitialize(conf, rmContext);
-    assertTrue(
-        "queue " + B2 + " should have inherited non-preemptability from parent",
-        queueB2.getPreemptionDisabled());
-
-    // When preemption is turned on for root.b.b2, it should be preemptable
-    // even though preemption is disabled on root.b (parent).
-    conf.setPreemptionDisabled(queueB2.getQueuePath(), false);
-    cs.reinitialize(conf, rmContext);
-    assertFalse("queue " + B2 + " should have been preemptable",
-        queueB2.getPreemptionDisabled());
-  }
-
-  @Test
-  public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
-    // queue refresh should not allow changing the maximum allocation setting
-    // per queue to be smaller than previous setting
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, mockContext);
-    checkQueueStructureCapacities(cs);
-
-    assertEquals("max allocation in CS",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-        cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max allocation for A1",
-        Resources.none(),
-        conf.getQueueMaximumAllocation(A1));
-    assertEquals("max allocation",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
-
-    CSQueue rootQueue = cs.getRootQueue();
-    CSQueue queueA = findQueue(rootQueue, A);
-    CSQueue queueA1 = findQueue(queueA, A1);
-    assertEquals("queue max allocation", ((LeafQueue) queueA1)
-        .getMaximumAllocation().getMemorySize(), 8192);
-
-    setMaxAllocMb(conf, A1, 4096);
-
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("should have thrown exception");
-    } catch (IOException e) {
-      assertTrue("max allocation exception",
-          e.getCause().toString().contains("not be decreased"));
-    }
-
-    setMaxAllocMb(conf, A1, 8192);
-    cs.reinitialize(conf, mockContext);
-
-    setMaxAllocVcores(conf, A1,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("should have thrown exception");
-    } catch (IOException e) {
-      assertTrue("max allocation exception",
-          e.getCause().toString().contains("not be decreased"));
-    }
-  }
-
-  @Test
-  public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
-    // verify we can't set the allocation per queue larger then cluster setting
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    cs.init(conf);
-    cs.start();
-    // change max allocation for B3 queue to be larger then cluster max
-    setMaxAllocMb(conf, B3,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("should have thrown exception");
-    } catch (IOException e) {
-      assertTrue("maximum allocation exception",
-          e.getCause().getMessage().contains("maximum allocation"));
-    }
-
-    setMaxAllocMb(conf, B3,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    cs.reinitialize(conf, mockContext);
-
-    setMaxAllocVcores(conf, B3,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("should have thrown exception");
-    } catch (IOException e) {
-      assertTrue("maximum allocation exception",
-          e.getCause().getMessage().contains("maximum allocation"));
-    }
-  }
-
-  @Test
-  public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
-    // queue refresh should allow max allocation per queue to go larger
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    setMaxAllocMb(conf,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    setMaxAllocVcores(conf,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-    setMaxAllocMb(conf, A1, 4096);
-    setMaxAllocVcores(conf, A1, 2);
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, mockContext);
-    checkQueueStructureCapacities(cs);
-
-    CSQueue rootQueue = cs.getRootQueue();
-    CSQueue queueA = findQueue(rootQueue, A);
-    CSQueue queueA1 = findQueue(queueA, A1);
-
-    assertEquals("max capability MB in CS",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-        cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max capability vcores in CS",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-        cs.getMaximumResourceCapability().getVirtualCores());
-    assertEquals("max allocation MB A1",
-        4096,
-            queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores A1",
-        2,
-            queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("cluster max allocation MB",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
-    assertEquals("cluster max allocation vcores",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
-
-    assertEquals("queue max allocation", 4096,
-            queueA1.getMaximumAllocation().getMemorySize());
-
-    setMaxAllocMb(conf, A1, 6144);
-    setMaxAllocVcores(conf, A1, 3);
-    cs.reinitialize(conf, null);
-    // conf will have changed but we shouldn't be able to change max allocation
-    // for the actual queue
-    assertEquals("max allocation MB A1", 6144,
-            queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores A1", 3,
-            queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("max allocation MB cluster",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
-    assertEquals("max allocation vcores cluster",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
-    assertEquals("queue max allocation MB", 6144,
-        queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("queue max allocation vcores", 3,
-        queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("max capability MB cluster",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-        cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("cluster max capability vcores",
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-        cs.getMaximumResourceCapability().getVirtualCores());
-  }
-
-  @Test
-  public void testRefreshQueuesMaxAllocationCSError() throws Exception {
-    // Try to refresh the cluster level max allocation size to be smaller
-    // and it should error out
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    setMaxAllocMb(conf, 10240);
-    setMaxAllocVcores(conf, 10);
-    setMaxAllocMb(conf, A1, 4096);
-    setMaxAllocVcores(conf, A1, 4);
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, mockContext);
-    checkQueueStructureCapacities(cs);
-
-    assertEquals("max allocation MB in CS", 10240,
-        cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max allocation vcores in CS", 10,
-        cs.getMaximumResourceCapability().getVirtualCores());
-
-    setMaxAllocMb(conf, 6144);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("should have thrown exception");
-    } catch (IOException e) {
-      assertTrue("max allocation exception",
-          e.getCause().toString().contains("not be decreased"));
-    }
-
-    setMaxAllocMb(conf, 10240);
-    cs.reinitialize(conf, mockContext);
-
-    setMaxAllocVcores(conf, 8);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("should have thrown exception");
-    } catch (IOException e) {
-      assertTrue("max allocation exception",
-          e.getCause().toString().contains("not be decreased"));
-    }
-  }
-
-  @Test
-  public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
-    // Try to refresh the cluster level max allocation size to be larger
-    // and verify that if there is no setting per queue it uses the
-    // cluster level setting.
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    setMaxAllocMb(conf, 10240);
-    setMaxAllocVcores(conf, 10);
-    setMaxAllocMb(conf, A1, 4096);
-    setMaxAllocVcores(conf, A1, 4);
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, mockContext);
-    checkQueueStructureCapacities(cs);
-
-    assertEquals("max allocation MB in CS", 10240,
-        cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max allocation vcores in CS", 10,
-        cs.getMaximumResourceCapability().getVirtualCores());
-
-    CSQueue rootQueue = cs.getRootQueue();
-    CSQueue queueA = findQueue(rootQueue, A);
-    CSQueue queueB = findQueue(rootQueue, B);
-    CSQueue queueA1 = findQueue(queueA, A1);
-    CSQueue queueA2 = findQueue(queueA, A2);
-    CSQueue queueB2 = findQueue(queueB, B2);
-
-    assertEquals("queue A1 max allocation MB", 4096,
-            queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("queue A1 max allocation vcores", 4,
-            queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("queue A2 max allocation MB", 10240,
-            queueA2.getMaximumAllocation().getMemorySize());
-    assertEquals("queue A2 max allocation vcores", 10,
-            queueA2.getMaximumAllocation().getVirtualCores());
-    assertEquals("queue B2 max allocation MB", 10240,
-            queueB2.getMaximumAllocation().getMemorySize());
-    assertEquals("queue B2 max allocation vcores", 10,
-            queueB2.getMaximumAllocation().getVirtualCores());
-
-    setMaxAllocMb(conf, 12288);
-    setMaxAllocVcores(conf, 12);
-    cs.reinitialize(conf, null);
-    // cluster level setting should change and any queues without
-    // per queue setting
-    assertEquals("max allocation MB in CS", 12288,
-        cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max allocation vcores in CS", 12,
-        cs.getMaximumResourceCapability().getVirtualCores());
-    assertEquals("queue A1 max MB allocation", 4096,
-            queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("queue A1 max vcores allocation", 4,
-            queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("queue A2 max MB allocation", 12288,
-            queueA2.getMaximumAllocation().getMemorySize());
-    assertEquals("queue A2 max vcores allocation", 12,
-            queueA2.getMaximumAllocation().getVirtualCores());
-    assertEquals("queue B2 max MB allocation", 12288,
-            queueB2.getMaximumAllocation().getMemorySize());
-    assertEquals("queue B2 max vcores allocation", 12,
-            queueB2.getMaximumAllocation().getVirtualCores());
-  }
-
-  @Test
-  public void testQueuesMaxAllocationInheritance() throws Exception {
-    // queue level max allocation is set by the queue configuration explicitly
-    // or inherits from the parent.
-
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    setMaxAllocMb(conf,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    setMaxAllocVcores(conf,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-
-    // Test the child queue overrides
-    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
-            "memory-mb=4096,vcores=2");
-    setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
-    setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
-    setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");
-
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, mockContext);
-    checkQueueStructureCapacities(cs);
-
-    CSQueue rootQueue = cs.getRootQueue();
-    CSQueue queueA = findQueue(rootQueue, A);
-    CSQueue queueB = findQueue(rootQueue, B);
-    CSQueue queueA1 = findQueue(queueA, A1);
-    CSQueue queueA2 = findQueue(queueA, A2);
-    CSQueue queueB1 = findQueue(queueB, B1);
-    CSQueue queueB2 = findQueue(queueB, B2);
-
-    assertEquals("max capability MB in CS",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max capability vcores in CS",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-            cs.getMaximumResourceCapability().getVirtualCores());
-    assertEquals("max allocation MB A1",
-            6144,
-            queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores A1",
-            2,
-            queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("max allocation MB A2",          4096,
-            queueA2.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores A2",
-            2,
-            queueA2.getMaximumAllocation().getVirtualCores());
-    assertEquals("max allocation MB B",          5120,
-            queueB.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation MB B1",          5120,
-            queueB1.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation MB B2",          1024,
-            queueB2.getMaximumAllocation().getMemorySize());
-
-    // Test get the max-allocation from different parent
-    unsetMaxAllocation(conf, A1);
-    unsetMaxAllocation(conf, B);
-    unsetMaxAllocation(conf, B1);
-    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
-            "memory-mb=6144,vcores=2");
-    setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");
-
-    cs.reinitialize(conf, mockContext);
-
-    assertEquals("max capability MB in CS",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max capability vcores in CS",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-            cs.getMaximumResourceCapability().getVirtualCores());
-    assertEquals("max allocation MB A1",
-            8192,
-            queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores A1",
-            2,
-            queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("max allocation MB B1",
-            6144,
-            queueB1.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores B1",
-            2,
-            queueB1.getMaximumAllocation().getVirtualCores());
-
-    // Test the default
-    unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT);
-    unsetMaxAllocation(conf, A);
-    unsetMaxAllocation(conf, A1);
-    cs.reinitialize(conf, mockContext);
-
-    assertEquals("max capability MB in CS",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            cs.getMaximumResourceCapability().getMemorySize());
-    assertEquals("max capability vcores in CS",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-            cs.getMaximumResourceCapability().getVirtualCores());
-    assertEquals("max allocation MB A1",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            queueA1.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores A1",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-            queueA1.getMaximumAllocation().getVirtualCores());
-    assertEquals("max allocation MB A2",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-            queueA2.getMaximumAllocation().getMemorySize());
-    assertEquals("max allocation vcores A2",
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-            queueA2.getMaximumAllocation().getVirtualCores());
-  }
-
-  @Test
-  public void testVerifyQueuesMaxAllocationConf() throws Exception {
-    // queue level max allocation can't exceed the cluster setting
-
-    CapacityScheduler cs = new CapacityScheduler();
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    setMaxAllocMb(conf,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    setMaxAllocVcores(conf,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-
-    long largerMem =
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024;
-    long largerVcores =
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES+10;
-
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, mockContext);
-    checkQueueStructureCapacities(cs);
-
-    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
-            "memory-mb=" + largerMem + ",vcores=2");
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("Queue Root maximum allocation can't exceed the cluster setting");
-    } catch(Exception e) {
-      assertTrue("maximum allocation exception",
-              e.getCause().getMessage().contains("maximum allocation"));
-    }
+    // way down so that root.b.b2 should NOT be preemptable.
+    conf.setPreemptionDisabled(rootQueue.getQueuePath(), true);
+    cs.reinitialize(conf, rmContext);
+    assertTrue(
+        "queue " + B2 + " should have inherited non-preemptability from root",
+        queueB2.getPreemptionDisabled());
 
-    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
-            "memory-mb=4096,vcores=2");
-    setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
-    setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("Queue A1 maximum allocation can't exceed the cluster setting");
-    } catch(Exception e) {
-      assertTrue("maximum allocation exception",
-              e.getCause().getMessage().contains("maximum allocation"));
-    }
-    setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("Queue A1 maximum allocation can't exceed the cluster setting");
-    } catch(Exception e) {
-      assertTrue("maximum allocation exception",
-              e.getCause().getMessage().contains("maximum allocation"));
-    }
+    // Enable preemption for root (grandparent) but disable for root.b (parent).
+    // root.b.b2 should inherit property from parent and NOT be preemptable
+    conf.setPreemptionDisabled(rootQueue.getQueuePath(), false);
+    conf.setPreemptionDisabled(queueB.getQueuePath(), true);
+    cs.reinitialize(conf, rmContext);
+    assertTrue(
+        "queue " + B2 + " should have inherited non-preemptability from parent",
+        queueB2.getPreemptionDisabled());
 
+    // When preemption is turned on for root.b.b2, it should be preemptable
+    // even though preemption is disabled on root.b (parent).
+    conf.setPreemptionDisabled(queueB2.getQueuePath(), false);
+    cs.reinitialize(conf, rmContext);
+    assertFalse("queue " + B2 + " should have been preemptable",
+        queueB2.getPreemptionDisabled());
   }
 
   private void waitContainerAllocated(MockAM am, int mem, int nContainer,
@@ -3995,35 +1921,6 @@ public class TestCapacityScheduler {
     assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores());
   }
 
-  @Test
-  public void testDefaultNodeLabelExpressionQueueConfig() throws Exception {
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    conf.setDefaultNodeLabelExpression("root.a", " x");
-    conf.setDefaultNodeLabelExpression("root.b", " y ");
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-
-    QueueInfo queueInfoA = cs.getQueueInfo("a", true, false);
-    Assert.assertEquals("Queue Name should be a", "a",
-        queueInfoA.getQueueName());
-    Assert.assertEquals("Queue Path should be root.a", "root.a",
-        queueInfoA.getQueuePath());
-    Assert.assertEquals("Default Node Label Expression should be x", "x",
-        queueInfoA.getDefaultNodeLabelExpression());
-
-    QueueInfo queueInfoB = cs.getQueueInfo("b", true, false);
-    Assert.assertEquals("Queue Name should be b", "b",
-        queueInfoB.getQueueName());
-    Assert.assertEquals("Queue Path should be root.b", "root.b",
-        queueInfoB.getQueuePath());
-    Assert.assertEquals("Default Node Label Expression should be y", "y",
-        queueInfoB.getDefaultNodeLabelExpression());
-  }
-
   @Test(timeout = 60000)
   public void testAMLimitUsage() throws Exception {
 
@@ -4216,44 +2113,6 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
-  private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-        maxAllocMb);
-  }
-
-  private void setMaxAllocMb(CapacitySchedulerConfiguration conf,
-      String queueName, int maxAllocMb) {
-    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
-        + MAXIMUM_ALLOCATION_MB;
-    conf.setInt(propName, maxAllocMb);
-  }
-
-  private void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-        maxAllocVcores);
-  }
-
-  private void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
-      String queueName, int maxAllocVcores) {
-    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
-        + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
-    conf.setInt(propName, maxAllocVcores);
-  }
-
-  private void setMaxAllocation(CapacitySchedulerConfiguration conf,
-                                String queueName, String maxAllocation) {
-    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
-            + MAXIMUM_ALLOCATION;
-    conf.set(propName, maxAllocation);
-  }
-
-  private void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
-                                  String queueName) {
-    String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
-            + MAXIMUM_ALLOCATION;
-    conf.unset(propName);
-  }
-
   private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     RMContainer rmContainer = cs.getRMContainer(containerId);
@@ -4264,171 +2123,6 @@ public class TestCapacityScheduler {
       Assert.fail("Cannot find RMContainer");
     }
   }
-  @Test
-  public void testRemovedNodeDecomissioningNode() throws Exception {
-    NodeStatus mockNodeStatus = createMockNodeStatus();
-
-    // Register nodemanager
-    NodeManager nm = registerNode("host_decom", 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
-        mockNodeStatus);
-
-    RMNode node =
-        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
-    // Send a heartbeat to kick the tires on the Scheduler
-    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
-    resourceManager.getResourceScheduler().handle(nodeUpdate);
-
-    // force remove the node to simulate race condition
-    ((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
-        removeNode(nm.getNodeId());
-    // Kick off another heartbeat with the node state mocked to decommissioning
-    RMNode spyNode =
-        Mockito.spy(resourceManager.getRMContext().getRMNodes()
-            .get(nm.getNodeId()));
-    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
-    resourceManager.getResourceScheduler().handle(
-        new NodeUpdateSchedulerEvent(spyNode));
-  }
-
-  @Test
-  public void testResourceUpdateDecommissioningNode() throws Exception {
-    // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
-    // to have 0 available resource
-    RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
-    Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
-    when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler<Event>() {
-      @Override
-      public void handle(Event event) {
-        if (event instanceof RMNodeResourceUpdateEvent) {
-          RMNodeResourceUpdateEvent resourceEvent =
-              (RMNodeResourceUpdateEvent) event;
-          resourceManager
-              .getResourceScheduler()
-              .getSchedulerNode(resourceEvent.getNodeId())
-              .updateTotalResource(resourceEvent.getResourceOption().getResource());
-        }
-      }
-    });
-    Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
-    ((CapacityScheduler) resourceManager.getResourceScheduler())
-        .setRMContext(spyContext);
-    ((AsyncDispatcher) mockDispatcher).start();
-
-    NodeStatus mockNodeStatus = createMockNodeStatus();
-
-    // Register node
-    String host_0 = "host_0";
-    NodeManager nm_0 = registerNode(host_0, 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
-        mockNodeStatus);
-    // ResourceRequest priorities
-    Priority priority_0 = Priority.newInstance(0);
-
-    // Submit an application
-    Application application_0 =
-        new Application("user_0", "a1", resourceManager);
-    application_0.submit();
-
-    application_0.addNodeManager(host_0, 1234, nm_0);
-
-    Resource capability_0_0 = Resources.createResource(1 * GB, 1);
-    application_0.addResourceRequestSpec(priority_0, capability_0_0);
-
-    Task task_0_0 =
-        new Task(application_0, priority_0, new String[] { host_0 });
-    application_0.addTask(task_0_0);
-
-    // Send resource requests to the scheduler
-    application_0.schedule();
-
-    nodeUpdate(nm_0);
-    // Kick off another heartbeat with the node state mocked to decommissioning
-    // This should update the schedulernodes to have 0 available resource
-    RMNode spyNode =
-        Mockito.spy(resourceManager.getRMContext().getRMNodes()
-            .get(nm_0.getNodeId()));
-    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
-    resourceManager.getResourceScheduler().handle(
-        new NodeUpdateSchedulerEvent(spyNode));
-
-    // Get allocations from the scheduler
-    application_0.schedule();
-
-    // Check the used resource is 1 GB 1 core
-    Assert.assertEquals(1 * GB, nm_0.getUsed().getMemorySize());
-    Resource usedResource =
-        resourceManager.getResourceScheduler()
-            .getSchedulerNode(nm_0.getNodeId()).getAllocatedResource();
-    Assert.assertEquals("Used Resource Memory Size should be 1GB", 1 * GB,
-        usedResource.getMemorySize());
-    Assert.assertEquals("Used Resource Virtual Cores should be 1", 1,
-        usedResource.getVirtualCores());
-    // Check total resource of scheduler node is also changed to 1 GB 1 core
-    Resource totalResource =
-        resourceManager.getResourceScheduler()
-            .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
-    Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
-        totalResource.getMemorySize());
-    Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,
-        totalResource.getVirtualCores());
-    // Check the available resource is 0/0
-    Resource availableResource =
-        resourceManager.getResourceScheduler()
-            .getSchedulerNode(nm_0.getNodeId()).getUnallocatedResource();
-    Assert.assertEquals("Available Resource Memory Size should be 0", 0,
-        availableResource.getMemorySize());
-    Assert.assertEquals("Available Resource Memory Size should be 0", 0,
-        availableResource.getVirtualCores());
-    // Kick off another heartbeat where the RMNodeResourceUpdateEvent would
-    // be skipped for DECOMMISSIONING state since the total resource is
-    // already equal to used resource from the previous heartbeat.
-    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
-    resourceManager.getResourceScheduler().handle(
-        new NodeUpdateSchedulerEvent(spyNode));
-    verify(mockDispatcher, times(4)).getEventHandler();
-  }
-
-  @Test
-  public void testSchedulingOnRemovedNode() throws Exception {
-    Configuration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    conf.setBoolean(
-        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
-            false);
-
-    MockRM rm = new MockRM(conf);
-    rm.start();
-    RMApp app = MockRMAppSubmitter.submitWithMemory(100, rm);
-    rm.drainEvents();
-
-    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
-    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
-
-    //remove nm2 to keep am alive
-    MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
-
-    am.allocate(ResourceRequest.ANY, 2048, 1, null);
-
-    CapacityScheduler scheduler =
-        (CapacityScheduler) rm.getRMContext().getScheduler();
-    FiCaSchedulerNode node =
-        (FiCaSchedulerNode)
-            scheduler.getNodeTracker().getNode(nm2.getNodeId());
-    scheduler.handle(new NodeRemovedSchedulerEvent(
-        rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
-    // schedulerNode is removed, try allocate a container
-    scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
-        true);
-
-    AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
-        new AppAttemptRemovedSchedulerEvent(
-            am.getApplicationAttemptId(),
-            RMAppAttemptState.FINISHED, false);
-    scheduler.handle(appRemovedEvent1);
-    rm.stop();
-  }
 
   @Test
   public void testCSReservationWithRootUnblocked() throws Exception {
@@ -4630,37 +2324,6 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
-  private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
-                                         int clusterTs, int appId, String queue,
-                                         String user) {
-    ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
-    ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
-        appId1, appId);
-
-    RMAppAttemptMetrics attemptMetric1 =
-        new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
-    RMAppImpl app1 = mock(RMAppImpl.class);
-    when(app1.getApplicationId()).thenReturn(appId1);
-    RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
-    Container container = mock(Container.class);
-    when(attempt1.getMasterContainer()).thenReturn(container);
-    ApplicationSubmissionContext submissionContext = mock(
-        ApplicationSubmissionContext.class);
-    when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
-    when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
-    when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
-    when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
-    rm.getRMContext().getRMApps().put(appId1, app1);
-
-    SchedulerEvent addAppEvent1 =
-        new AppAddedSchedulerEvent(appId1, queue, user);
-    cs.handle(addAppEvent1);
-    SchedulerEvent addAttemptEvent1 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
-    cs.handle(addAttemptEvent1);
-    return appAttemptId1;
-  }
-
   @Test
   public void testAppAttemptLocalityStatistics() throws Exception {
     Configuration conf =
@@ -4722,256 +2385,6 @@ public class TestCapacityScheduler {
         attemptMetrics.getLocalityStatistics());
   }
 
-  /**
-   * Test for queue deletion.
-   * @throws Exception
-   */
-  @Test
-  public void testRefreshQueuesWithQueueDelete() throws Exception {
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
-        null, new RMContainerTokenSecretManager(conf),
-        new NMTokenSecretManagerInRM(conf),
-        new ClientToAMTokenSecretManagerInRM(), null);
-    setupQueueConfiguration(conf);
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, rmContext);
-    checkQueueStructureCapacities(cs);
-
-    // test delete leaf queue when there is application running.
-    Map<String, CSQueue> queues =
-        cs.getCapacitySchedulerQueueManager().getShortNameQueues();
-    String b1QTobeDeleted = "b1";
-    LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted));
-    when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
-        .thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue(b1QTobeDeleted, csB1Queue);
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfigurationWithoutB1(conf);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("Expected to throw exception when refresh queue tries to delete a"
-          + " queue with running apps");
-    } catch (IOException e) {
-      // ignore
-    }
-
-    // test delete leaf queue(root.b.b1) when there is no application running.
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfigurationWithoutB1(conf);
-    try {
-      cs.reinitialize(conf, mockContext);
-    } catch (IOException e) {
-      LOG.error(
-          "Expected to NOT throw exception when refresh queue tries to delete"
-              + " a queue WITHOUT running apps",
-          e);
-      fail("Expected to NOT throw exception when refresh queue tries to delete"
-          + " a queue WITHOUT running apps");
-    }
-    CSQueue rootQueue = cs.getRootQueue();
-    CSQueue queueB = findQueue(rootQueue, B);
-    CSQueue queueB3 = findQueue(queueB, B1);
-    assertNull("Refresh needs to support delete of leaf queue ", queueB3);
-
-    // reset back to default configuration for testing parent queue delete
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    cs.reinitialize(conf, rmContext);
-    checkQueueStructureCapacities(cs);
-
-    // set the configurations such that it fails once but should be successfull
-    // next time
-    queues = cs.getCapacitySchedulerQueueManager().getShortNameQueues();
-    CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b"));
-    when(bQueue.getState()).thenReturn(QueueState.DRAINING)
-        .thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue("b", bQueue);
-
-    bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
-    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
-
-    bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
-    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
-
-    bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
-    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
-
-    // test delete Parent queue when there is application running.
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfigurationWithoutB(conf);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("Expected to throw exception when refresh queue tries to delete a"
-          + " parent queue with running apps in children queue");
-    } catch (IOException e) {
-      // ignore
-    }
-
-    // test delete Parent queue when there is no application running.
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfigurationWithoutB(conf);
-    try {
-      cs.reinitialize(conf, mockContext);
-    } catch (IOException e) {
-      fail("Expected to not throw exception when refresh queue tries to delete"
-          + " a queue without running apps");
-    }
-    rootQueue = cs.getRootQueue();
-    queueB = findQueue(rootQueue, B);
-    String message =
-        "Refresh needs to support delete of Parent queue and its children.";
-    assertNull(message, queueB);
-    assertNull(message,
-        cs.getCapacitySchedulerQueueManager().getQueues().get("b"));
-    assertNull(message,
-        cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
-    assertNull(message,
-        cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
-
-    cs.stop();
-  }
-
-  /**
-   * Test for all child queue deletion and thus making parent queue a child.
-   * @throws Exception
-   */
-  @Test
-  public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
-        null, new RMContainerTokenSecretManager(conf),
-        new NMTokenSecretManagerInRM(conf),
-        new ClientToAMTokenSecretManagerInRM(), null);
-    setupQueueConfiguration(conf);
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, rmContext);
-    checkQueueStructureCapacities(cs);
-
-    // test delete all leaf queues when there is no application running.
-    Map<String, CSQueue> queues =
-        cs.getCapacitySchedulerQueueManager().getShortNameQueues();
-
-    CSQueue bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
-    when(bQueue.getState()).thenReturn(QueueState.RUNNING)
-        .thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
-
-    bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
-    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
-
-    bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
-    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
-    cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
-
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfWithoutChildrenOfB(conf);
-
-    // test convert parent queue to leaf queue(root.b) when there is no
-    // application running.
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("Expected to throw exception when refresh queue tries to make parent"
-          + " queue a child queue when one of its children is still running.");
-    } catch (IOException e) {
-      //do not do anything, expected exception
-    }
-
-    // test delete leaf queues(root.b.b1,b2,b3) when there is no application
-    // running.
-    try {
-      cs.reinitialize(conf, mockContext);
-    } catch (IOException e) {
-      e.printStackTrace();
-      fail("Expected to NOT throw exception when refresh queue tries to delete"
-          + " all children of a parent queue(without running apps).");
-    }
-    CSQueue rootQueue = cs.getRootQueue();
-    CSQueue queueB = findQueue(rootQueue, B);
-    assertNotNull("Parent Queue B should not be deleted", queueB);
-    Assert.assertTrue("As Queue'B children are not deleted",
-        queueB instanceof LeafQueue);
-
-    String message =
-        "Refresh needs to support delete of all children of Parent queue.";
-    assertNull(message,
-        cs.getCapacitySchedulerQueueManager().getQueues().get("b3"));
-    assertNull(message,
-        cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
-    assertNull(message,
-        cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
-
-    cs.stop();
-  }
-
-  /**
-   * Test if we can convert a leaf queue to a parent queue
-   * @throws Exception
-   */
-  @Test (timeout = 10000)
-  public void testConvertLeafQueueToParentQueue() throws Exception {
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
-        null, new RMContainerTokenSecretManager(conf),
-        new NMTokenSecretManagerInRM(conf),
-        new ClientToAMTokenSecretManagerInRM(), null);
-    setupQueueConfiguration(conf);
-    cs.setConf(new YarnConfiguration());
-    cs.setRMContext(resourceManager.getRMContext());
-    cs.init(conf);
-    cs.start();
-    cs.reinitialize(conf, rmContext);
-    checkQueueStructureCapacities(cs);
-
-    String targetQueue = "b1";
-    CSQueue b1 = cs.getQueue(targetQueue);
-    Assert.assertEquals(QueueState.RUNNING, b1.getState());
-
-    // test if we can convert a leaf queue which is in RUNNING state
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfigurationWithB1AsParentQueue(conf);
-    try {
-      cs.reinitialize(conf, mockContext);
-      fail("Expected to throw exception when refresh queue tries to convert"
-          + " a child queue to a parent queue.");
-    } catch (IOException e) {
-      // ignore
-    }
-
-    // now set queue state for b1 to STOPPED
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
-    cs.reinitialize(conf, mockContext);
-    Assert.assertEquals(QueueState.STOPPED, b1.getState());
-
-    // test if we can convert a leaf queue which is in STOPPED state
-    conf = new CapacitySchedulerConfiguration();
-    setupQueueConfigurationWithB1AsParentQueue(conf);
-    try {
-      cs.reinitialize(conf, mockContext);
-    } catch (IOException e) {
-      fail("Expected to NOT throw exception when refresh queue tries"
-          + " to convert a leaf queue WITHOUT running apps");
-    }
-    b1 = cs.getQueue(targetQueue);
-    Assert.assertTrue(b1 instanceof ParentQueue);
-    Assert.assertEquals(QueueState.RUNNING, b1.getState());
-    Assert.assertTrue(!b1.getChildQueues().isEmpty());
-  }
 
   @Test(timeout = 30000)
   public void testAMLimitDouble() throws Exception {
@@ -5251,166 +2664,6 @@ public class TestCapacityScheduler {
   }
 
   @Test
-  public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
-
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-      ResourceScheduler.class);
-
-    CapacitySchedulerConfiguration newConf =
-        new CapacitySchedulerConfiguration(conf);
-
-    // Define top-level queues
-    newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
-        new String[] { "a", "b" });
-
-    newConf.setCapacity(A, 50);
-    newConf.setCapacity(B, 50);
-
-    // Define 2nd-level queues
-    newConf.setQueues(A, new String[] { "a1" });
-    newConf.setCapacity(A1, 100);
-    newConf.setUserLimitFactor(A1, 2.0f);
-    newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
-
-    newConf.setQueues(B, new String[] { "b1" });
-    newConf.setCapacity(B1, 100);
-    newConf.setUserLimitFactor(B1, 2.0f);
-
-    LOG.info("Setup top-level queues a and b");
-
-    MockRM rm = new MockRM(newConf);
-    rm.start();
-
-    CapacityScheduler scheduler =
-        (CapacityScheduler) rm.getResourceScheduler();
-
-    MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
-
-    // submit an app
-    MockRMAppSubmissionData data3 =
-        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
-            .withAppName("test-move-1")
-            .withUser("u1")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app = MockRMAppSubmitter.submit(rm, data3);
-    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
-
-    ApplicationAttemptId appAttemptId =
-        rm.getApplicationReport(app.getApplicationId())
-            .getCurrentApplicationAttemptId();
-
-    MockRMAppSubmissionData data2 =
-        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
-            .withAppName("app")
-            .withUser("u2")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
-
-    MockRMAppSubmissionData data1 =
-        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
-            .withAppName("app")
-            .withUser("u3")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app3 = MockRMAppSubmitter.submit(rm, data1);
-
-    MockRMAppSubmissionData data =
-        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
-            .withAppName("app")
-            .withUser("u4")
-            .withAcls(null)
-            .withQueue("a1")
-            .withUnmanagedAM(false)
-            .build();
-    RMApp app4 = MockRMAppSubmitter.submit(rm, data);
-
-    // Each application asks 50 * 1GB containers
-    am1.allocate("*", 1 * GB, 50, null);
-    am2.allocate("*", 1 * GB, 50, null);
-
-    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // check preconditions
-    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(4, appsInA1.size());
-    String queue =
-        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("a1", queue);
-
-    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-    assertTrue(appsInA.contains(appAttemptId));
-    assertEquals(4, appsInA.size());
-
-    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(4, appsInRoot.size());
-
-    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
-    assertTrue(appsInB1.isEmpty());
-
-    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.isEmpty());
-
-    UsersManager um =
-        (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
-
-    assertEquals(4, um.getNumActiveUsers());
-    assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
-
-    // now move the app
-    scheduler.moveAllApps("a1", "b1");
-
-    //Triggering this event so that user limit computation can
-    //happen again
-    for (int i = 0; i < 10; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      Thread.sleep(500);
-    }
-
-    // check postconditions
-    appsInB1 = scheduler.getAppsInQueue("b1");
-
-    assertEquals(4, appsInB1.size());
-    queue =
-        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
-            .getQueueName();
-    Assert.assertEquals("b1", queue);
-
-    appsInB = scheduler.getAppsInQueue("b");
-    assertTrue(appsInB.contains(appAttemptId));
-    assertEquals(4, appsInB.size());
-
-    appsInRoot = scheduler.getAppsInQueue("root");
-    assertTrue(appsInRoot.contains(appAttemptId));
-    assertEquals(4, appsInRoot.size());
-
-    List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1");
-    assertEquals(0, oldAppsInA1.size());
-
-    UsersManager um_b1 =
-        (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
-
-    assertEquals(2, um_b1.getNumActiveUsers());
-    assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps());
-
-    appsInB1 = scheduler.getAppsInQueue("b1");
-    assertEquals(4, appsInB1.size());
-    rm.close();
-  }
-
-  @Test
   public void testCSQueueMetrics() throws Exception {
 
     // Initialize resource map
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
new file mode 100644
index 0000000..9943e03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java
@@ -0,0 +1,1499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkApplicationResourceUsage;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkNodeResourceUsage;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerApps {
+
+  private ResourceManager resourceManager = null;
+  private RMContext mockContext;
+
+  @Before
+  public void setUp() throws Exception {
+    resourceManager = createResourceManager();
+    mockContext = createMockRMContext();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stopResourceManager(resourceManager);
+  }
+
+  @Test
+  public void testGetAppsInQueue() throws Exception {
+    Application application0 = new Application("user_0", "a1", resourceManager);
+    application0.submit();
+
+    Application application1 = new Application("user_0", "a2", resourceManager);
+    application1.submit();
+
+    Application application2 = new Application("user_0", "b2", resourceManager);
+    application2.submit();
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(application0.getApplicationAttemptId()));
+    assertTrue(appsInA.contains(application1.getApplicationAttemptId()));
+    assertEquals(2, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(application0.getApplicationAttemptId()));
+    assertTrue(appsInRoot.contains(application1.getApplicationAttemptId()));
+    assertTrue(appsInRoot.contains(application2.getApplicationAttemptId()));
+    assertEquals(3, appsInRoot.size());
+
+    Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
+  }
+
+  @Test
+  public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    @SuppressWarnings("unchecked")
+    AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
+        (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
+            .getResourceScheduler();
+    SchedulerApplication<SchedulerApplicationAttempt> app =
+        TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
+            cs.getSchedulerApplications(), cs, "a1");
+    Assert.assertEquals("a1", app.getQueue().getQueueName());
+  }
+
+  @Test
+  public void testKillAllAppsInQueue() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertEquals("a1", queue);
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // now kill the app
+    scheduler.killAllAppsInQueue("a1");
+
+    // check postconditions
+    rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+    rm.waitForAppRemovedFromScheduler(app.getApplicationId());
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.isEmpty());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testKillAllAppsInvalidSource() throws Exception {
+    MockRM rm = setUpMove();
+    YarnScheduler scheduler = rm.getResourceScheduler();
+
+    // submit an app
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // now kill the app
+    try {
+      scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check postconditions, app should still be in a1
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    rm.stop();
+  }
+
+  // Test to ensure that we don't carry out reservation on nodes
+  // that have no CPU available when using the DominantResourceCalculator
+  @Test(timeout = 30000)
+  public void testAppReservationWithDominantResourceCalculator() throws Exception {
+    CapacitySchedulerConfiguration csconf =
+        new CapacitySchedulerConfiguration();
+    csconf.setResourceComparator(DominantResourceCalculator.class);
+
+    YarnConfiguration conf = new YarnConfiguration(csconf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);
+
+    // register extra nodes to bump up cluster resource
+    MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
+    rm.registerNode("127.0.0.1:1236", 10 * GB, 4);
+
+    RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm);
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    SchedulerNodeReport reportNm1 =
+        rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+
+    // check node report
+    Assert.assertEquals(1 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(9 * GB, reportNm1.getAvailableResource().getMemorySize());
+
+    // add request for containers
+    am1.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 1 * GB, 1, 1);
+    am1.schedule(); // send the request
+
+    // kick the scheduler, container reservation should not happen
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+    AllocateResponse allocResponse = am1.schedule();
+    ApplicationResourceUsageReport report =
+        rm.getResourceScheduler().getAppResourceUsageReport(
+            attempt1.getAppAttemptId());
+    Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
+    Assert.assertEquals(0, report.getNumReservedContainers());
+
+    // container should get allocated on this node
+    nm2.nodeHeartbeat(true);
+
+    while (allocResponse.getAllocatedContainers().size() == 0) {
+      Thread.sleep(100);
+      allocResponse = am1.schedule();
+    }
+    report =
+        rm.getResourceScheduler().getAppResourceUsageReport(
+            attempt1.getAppAttemptId());
+    Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
+    Assert.assertEquals(0, report.getNumReservedContainers());
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppBasic() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+    QueueMetrics metrics = scheduler.getRootQueueMetrics();
+    Assert.assertEquals(0, metrics.getAppsPending());
+    // submit an app
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertEquals("a1", queue);
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    assertEquals(1, metrics.getAppsPending());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    scheduler.moveApplication(app.getApplicationId(), "b1");
+
+    // check postconditions
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(1, appsInB1.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertEquals("b1", queue);
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.contains(appAttemptId));
+    assertEquals(1, appsInB.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    assertEquals(1, metrics.getAppsPending());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppPendingMetrics() throws Exception {
+    MockRM rm = setUpMove();
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    assertApps(scheduler, 0, 0, 0);
+
+    // submit two apps in a1
+    RMApp app1 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .build());
+    RMApp app2 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-2")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .build());
+    assertApps(scheduler, 2, 0, 2);
+
+    // submit one app in b1
+    RMApp app3 = MockRMAppSubmitter.submit(rm,
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-2")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("b1")
+            .build());
+    assertApps(scheduler, 2, 1, 3);
+
+    // now move the app1 from a1 to b1
+    scheduler.moveApplication(app1.getApplicationId(), "b1");
+    assertApps(scheduler, 1, 2, 3);
+
+    // now move the app2 from a1 to b1
+    scheduler.moveApplication(app2.getApplicationId(), "b1");
+    assertApps(scheduler, 0, 3, 3);
+
+    // now move the app3 from b1 to a1
+    scheduler.moveApplication(app3.getApplicationId(), "a1");
+    assertApps(scheduler, 1, 2, 3);
+    rm.stop();
+  }
+
+  private void assertApps(ResourceScheduler scheduler,
+                          int a1Size,
+                          int b1Size,
+                          int appsPending) {
+    assertAppsSize(scheduler, "a1", a1Size);
+    assertAppsSize(scheduler, "b1", b1Size);
+    assertEquals(appsPending, scheduler.getRootQueueMetrics().getAppsPending());
+  }
+
+  private void assertAppsSize(ResourceScheduler scheduler, String queueName, int size) {
+    assertEquals(size, scheduler.getAppsInQueue(queueName).size());
+  }
+
+  @Test
+  public void testMoveAppSameParent() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    assertOneAppInQueue(scheduler, "a1");
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "a", appAttemptId);
+    assertApps(scheduler, "a2");
+
+    // now move the app
+    scheduler.moveApplication(app.getApplicationId(), "a2");
+
+    // check postconditions
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "a", appAttemptId);
+    assertApps(scheduler, "a1");
+    assertOneAppInQueue(scheduler, "a2");
+
+    rm.stop();
+  }
+
+  private void assertApps(ResourceScheduler scheduler,
+                          String queueName,
+                          ApplicationAttemptId... apps) {
+    assertEquals(Lists.newArrayList(apps), scheduler.getAppsInQueue(queueName));
+  }
+
+  private void assertOneAppInQueue(AbstractYarnScheduler scheduler, String queueName) {
+    List<ApplicationAttemptId> apps = scheduler.getAppsInQueue(queueName);
+    assertEquals(1, apps.size());
+    Assert.assertEquals(queueName,
+        scheduler.getApplicationAttempt(apps.get(0)).getQueue().getQueueName());
+  }
+
+  @Test
+  public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
+    // Register node1
+    String host0 = "host_0";
+    NodeManager nm0 =
+        registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(4 * GB, 1), mockNodeStatus);
+
+    // Register node2
+    String host1 = "host_1";
+    NodeManager nm1 =
+        registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(2 * GB, 1), mockNodeStatus);
+
+    // ResourceRequest priorities
+    Priority priority0 = Priority.newInstance(0);
+    Priority priority1 = Priority.newInstance(1);
+
+    // Submit application_0
+    Application application0 =
+        new Application("user_0", "a1", resourceManager);
+    application0.submit(); // app + app attempt event sent to scheduler
+
+    application0.addNodeManager(host0, 1234, nm0);
+    application0.addNodeManager(host1, 1234, nm1);
+
+    Resource capability00 = Resources.createResource(1 * GB, 1);
+    application0.addResourceRequestSpec(priority1, capability00);
+
+    Resource capability01 = Resources.createResource(2 * GB, 1);
+    application0.addResourceRequestSpec(priority0, capability01);
+
+    Task task00 =
+        new Task(application0, priority1, new String[]{host0, host1});
+    application0.addTask(task00);
+
+    // Submit application_1
+    Application application1 =
+        new Application("user_1", "b2", resourceManager);
+    application1.submit(); // app + app attempt event sent to scheduler
+
+    application1.addNodeManager(host0, 1234, nm0);
+    application1.addNodeManager(host1, 1234, nm1);
+
+    Resource capability10 = Resources.createResource(1 * GB, 1);
+    application1.addResourceRequestSpec(priority1, capability10);
+
+    Resource capability11 = Resources.createResource(2 * GB, 1);
+    application1.addResourceRequestSpec(priority0, capability11);
+
+    Task task10 =
+        new Task(application1, priority1, new String[]{host0, host1});
+    application1.addTask(task10);
+
+    // Send resource requests to the scheduler
+    application0.schedule(); // allocate
+    application1.schedule(); // allocate
+
+    // task_0_0 task_1_0 allocated, used=2G
+    nodeUpdate(resourceManager, nm0);
+
+    // nothing allocated
+    nodeUpdate(resourceManager, nm1);
+
+    // Get allocations from the scheduler
+    application0.schedule(); // task_0_0
+    checkApplicationResourceUsage(1 * GB, application0);
+
+    application1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application1);
+
+    checkNodeResourceUsage(2 * GB, nm0); // task_0_0 (1G) and task_1_0 (1G) 2G
+    // available
+    checkNodeResourceUsage(0 * GB, nm1); // no tasks, 2G available
+
+    // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
+    // total cap)
+    scheduler.moveApplication(application0.getApplicationId(), "b1");
+
+    // 2GB 1C
+    Task task11 =
+        new Task(application1, priority0,
+            new String[]{ResourceRequest.ANY});
+    application1.addTask(task11);
+
+    application1.schedule();
+
+    // 2GB 1C
+    Task task01 =
+        new Task(application0, priority0, new String[]{host0, host1});
+    application0.addTask(task01);
+
+    application0.schedule();
+
+    // prev 2G used free 2G
+    nodeUpdate(resourceManager, nm0);
+
+    // prev 0G used free 2G
+    nodeUpdate(resourceManager, nm1);
+
+    // Get allocations from the scheduler
+    application1.schedule();
+    checkApplicationResourceUsage(3 * GB, application1);
+
+    // Get allocations from the scheduler
+    application0.schedule();
+    checkApplicationResourceUsage(3 * GB, application0);
+
+    checkNodeResourceUsage(4 * GB, nm0);
+    checkNodeResourceUsage(2 * GB, nm1);
+  }
+
+  @Test
+  public void testMoveAppSuccess() throws Exception {
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
+    // Register node1
+    String host0 = "host_0";
+    NodeManager nm0 =
+        registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+    // Register node2
+    String host1 = "host_1";
+    NodeManager nm1 =
+        registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+    // ResourceRequest priorities
+    Priority priority0 = Priority.newInstance(0);
+    Priority priority1 = Priority.newInstance(1);
+
+    // Submit application_0
+    Application application0 =
+        new Application("user_0", "a1", resourceManager);
+    application0.submit(); // app + app attempt event sent to scheduler
+
+    application0.addNodeManager(host0, 1234, nm0);
+    application0.addNodeManager(host1, 1234, nm1);
+
+    Resource capability00 = Resources.createResource(3 * GB, 1);
+    application0.addResourceRequestSpec(priority1, capability00);
+
+    Resource capability01 = Resources.createResource(2 * GB, 1);
+    application0.addResourceRequestSpec(priority0, capability01);
+
+    Task task00 =
+        new Task(application0, priority1, new String[]{host0, host1});
+    application0.addTask(task00);
+
+    // Submit application_1
+    Application application1 =
+        new Application("user_1", "b2", resourceManager);
+    application1.submit(); // app + app attempt event sent to scheduler
+
+    application1.addNodeManager(host0, 1234, nm0);
+    application1.addNodeManager(host1, 1234, nm1);
+
+    Resource capability10 = Resources.createResource(1 * GB, 1);
+    application1.addResourceRequestSpec(priority1, capability10);
+
+    Resource capability11 = Resources.createResource(2 * GB, 1);
+    application1.addResourceRequestSpec(priority0, capability11);
+
+    Task task10 =
+        new Task(application1, priority1, new String[]{host0, host1});
+    application1.addTask(task10);
+
+    // Send resource requests to the scheduler
+    application0.schedule(); // allocate
+    application1.schedule(); // allocate
+
+    // b2 can only run 1 app at a time
+    scheduler.moveApplication(application0.getApplicationId(), "b2");
+
+    nodeUpdate(resourceManager, nm0);
+
+    nodeUpdate(resourceManager, nm1);
+
+    // Get allocations from the scheduler
+    application0.schedule(); // task_0_0
+    checkApplicationResourceUsage(0 * GB, application0);
+
+    application1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application1);
+
+    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+    // not scheduled
+    checkNodeResourceUsage(1 * GB, nm0);
+    checkNodeResourceUsage(0 * GB, nm1);
+
+    // lets move application_0 to a queue where it can run
+    scheduler.moveApplication(application0.getApplicationId(), "a2");
+    application0.schedule();
+
+    nodeUpdate(resourceManager, nm1);
+
+    // Get allocations from the scheduler
+    application0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application0);
+
+    checkNodeResourceUsage(1 * GB, nm0);
+    checkNodeResourceUsage(3 * GB, nm1);
+
+  }
+
+  @Test(expected = YarnException.class)
+  public void testMoveAppViolateQueueState() throws Exception {
+    resourceManager = new ResourceManager() {
+      @Override
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+        mgr.init(getConfig());
+        return mgr;
+      }
+    };
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    StringBuilder qState = new StringBuilder();
+    qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
+        .append(CapacitySchedulerConfiguration.DOT)
+        .append(CapacitySchedulerConfiguration.STATE);
+    csConf.set(qState.toString(), QueueState.STOPPED.name());
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    resourceManager.init(conf);
+    resourceManager.getRMContext().getContainerTokenSecretManager()
+        .rollMasterKey();
+    resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
+    ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
+    mockContext = mock(RMContext.class);
+    when(mockContext.getConfigurationProvider()).thenReturn(
+        new LocalConfigurationProvider());
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
+    // Register node1
+    String host0 = "host_0";
+    NodeManager nm0 =
+        registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(6 * GB, 1), mockNodeStatus);
+
+    // ResourceRequest priorities
+    Priority priority0 = Priority.newInstance(0);
+    Priority priority1 = Priority.newInstance(1);
+
+    // Submit application_0
+    Application application0 =
+        new Application("user_0", "a1", resourceManager);
+    application0.submit(); // app + app attempt event sent to scheduler
+
+    application0.addNodeManager(host0, 1234, nm0);
+
+    Resource capability00 = Resources.createResource(3 * GB, 1);
+    application0.addResourceRequestSpec(priority1, capability00);
+
+    Resource capability01 = Resources.createResource(2 * GB, 1);
+    application0.addResourceRequestSpec(priority0, capability01);
+
+    Task task00 =
+        new Task(application0, priority1, new String[]{host0});
+    application0.addTask(task00);
+
+    // Send resource requests to the scheduler
+    application0.schedule(); // allocate
+
+    // task_0_0 allocated
+    nodeUpdate(resourceManager, nm0);
+
+    // Get allocations from the scheduler
+    application0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application0);
+
+    checkNodeResourceUsage(3 * GB, nm0);
+    // b2 queue contains 3GB consumption app,
+    // add another 3GB will hit max capacity limit on queue b
+    scheduler.moveApplication(application0.getApplicationId(), "b1");
+
+  }
+
+  @Test
+  public void testMoveAppQueueMetricsCheck() throws Exception {
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
+    // Register node1
+    String host0 = "host_0";
+    NodeManager nm0 =
+        registerNode(resourceManager, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+    // Register node2
+    String host1 = "host_1";
+    NodeManager nm1 =
+        registerNode(resourceManager, host1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1), mockNodeStatus);
+
+    // ResourceRequest priorities
+    Priority priority0 = Priority.newInstance(0);
+    Priority priority1 = Priority.newInstance(1);
+
+    // Submit application_0
+    Application application0 =
+        new Application("user_0", "a1", resourceManager);
+    application0.submit(); // app + app attempt event sent to scheduler
+
+    application0.addNodeManager(host0, 1234, nm0);
+    application0.addNodeManager(host1, 1234, nm1);
+
+    Resource capability00 = Resources.createResource(3 * GB, 1);
+    application0.addResourceRequestSpec(priority1, capability00);
+
+    Resource capability01 = Resources.createResource(2 * GB, 1);
+    application0.addResourceRequestSpec(priority0, capability01);
+
+    Task task00 =
+        new Task(application0, priority1, new String[]{host0, host1});
+    application0.addTask(task00);
+
+    // Submit application_1
+    Application application1 =
+        new Application("user_1", "b2", resourceManager);
+    application1.submit(); // app + app attempt event sent to scheduler
+
+    application1.addNodeManager(host0, 1234, nm0);
+    application1.addNodeManager(host1, 1234, nm1);
+
+    Resource capability10 = Resources.createResource(1 * GB, 1);
+    application1.addResourceRequestSpec(priority1, capability10);
+
+    Resource capability11 = Resources.createResource(2 * GB, 1);
+    application1.addResourceRequestSpec(priority0, capability11);
+
+    Task task10 =
+        new Task(application1, priority1, new String[]{host0, host1});
+    application1.addTask(task10);
+
+    // Send resource requests to the scheduler
+    application0.schedule(); // allocate
+    application1.schedule(); // allocate
+
+    nodeUpdate(resourceManager, nm0);
+
+    nodeUpdate(resourceManager, nm1);
+
+    CapacityScheduler cs =
+        (CapacityScheduler) resourceManager.getResourceScheduler();
+    CSQueue origRootQ = cs.getRootQueue();
+    CapacitySchedulerInfo oldInfo =
+        new CapacitySchedulerInfo(origRootQ, cs);
+    int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
+    int origNumAppsRoot = origRootQ.getNumApplications();
+
+    scheduler.moveApplication(application0.getApplicationId(), "a2");
+
+    CSQueue newRootQ = cs.getRootQueue();
+    int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
+    int newNumAppsRoot = newRootQ.getNumApplications();
+    CapacitySchedulerInfo newInfo =
+        new CapacitySchedulerInfo(newRootQ, cs);
+    CapacitySchedulerLeafQueueInfo origOldA1 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo origNewA1 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo targetOldA2 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo targetNewA2 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
+    // originally submitted here
+    assertEquals(1, origOldA1.getNumApplications());
+    assertEquals(1, origNumAppsA);
+    assertEquals(2, origNumAppsRoot);
+    // after the move
+    assertEquals(0, origNewA1.getNumApplications());
+    assertEquals(1, newNumAppsA);
+    assertEquals(2, newNumAppsRoot);
+    // original consumption on a1
+    assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemorySize());
+    assertEquals(1, origOldA1.getResourcesUsed().getvCores());
+    assertEquals(0, origNewA1.getResourcesUsed().getMemorySize()); // after the move
+    assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
+    // app moved here with live containers
+    assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemorySize());
+    assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
+    // it was empty before the move
+    assertEquals(0, targetOldA2.getNumApplications());
+    assertEquals(0, targetOldA2.getResourcesUsed().getMemorySize());
+    assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
+    // after the app moved here
+    assertEquals(1, targetNewA2.getNumApplications());
+    // 1 container on original queue before move
+    assertEquals(1, origOldA1.getNumContainers());
+    // after the move the resource released
+    assertEquals(0, origNewA1.getNumContainers());
+    // and moved to the new queue
+    assertEquals(1, targetNewA2.getNumContainers());
+    // which originally didn't have any
+    assertEquals(0, targetOldA2.getNumContainers());
+    // 1 user with 3GB
+    assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getMemorySize());
+    // 1 user with 1 core
+    assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getvCores());
+    // user ha no more running app in the orig queue
+    assertEquals(0, origNewA1.getUsers().getUsersList().size());
+    // 1 user with 3GB
+    assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getMemorySize());
+    // 1 user with 1 core
+    assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getvCores());
+
+    // Get allocations from the scheduler
+    application0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application0);
+
+    application1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application1);
+
+    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+    // not scheduled
+    checkNodeResourceUsage(4 * GB, nm0);
+    checkNodeResourceUsage(0 * GB, nm1);
+
+  }
+
+  @Test
+  public void testMoveAllApps() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    assertOneAppInQueue(scheduler, "a1");
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "a", appAttemptId);
+    assertApps(scheduler, "a1", appAttemptId);
+    assertApps(scheduler, "b1");
+    assertApps(scheduler, "b");
+
+    // now move the app
+    scheduler.moveAllApps("a1", "b1");
+
+    // check post conditions
+    Thread.sleep(1000);
+    assertOneAppInQueue(scheduler, "b1");
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "b", appAttemptId);
+    assertApps(scheduler, "b1", appAttemptId);
+    assertApps(scheduler, "a1");
+    assertApps(scheduler, "a");
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAllAppsInvalidDestination() throws Exception {
+    MockRM rm = setUpMove();
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+
+    // submit an app
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "a", appAttemptId);
+    assertApps(scheduler, "a1", appAttemptId);
+    assertApps(scheduler, "b");
+    assertApps(scheduler, "b1");
+
+    // now move the app
+    try {
+      scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check post conditions, app should still be in a1
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "a", appAttemptId);
+    assertApps(scheduler, "a1", appAttemptId);
+    assertApps(scheduler, "b");
+    assertApps(scheduler, "b1");
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAllAppsInvalidSource() throws Exception {
+    MockRM rm = setUpMove();
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+
+    // submit an app
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("user_0")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data);
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "a", appAttemptId);
+    assertApps(scheduler, "a1", appAttemptId);
+    assertApps(scheduler, "b");
+    assertApps(scheduler, "b1");
+
+    // now move the app
+    try {
+      scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check post conditions, app should still be in a1
+    assertApps(scheduler, "root", appAttemptId);
+    assertApps(scheduler, "a", appAttemptId);
+    assertApps(scheduler, "a1", appAttemptId);
+    assertApps(scheduler, "b");
+    assertApps(scheduler, "b1");
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+
+    // Define top-level queues
+    newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{"a", "b"});
+
+    newConf.setCapacity(A, 50);
+    newConf.setCapacity(B, 50);
+
+    // Define 2nd-level queues
+    newConf.setQueues(A, new String[]{"a1"});
+    newConf.setCapacity(A1, 100);
+    newConf.setUserLimitFactor(A1, 2.0f);
+    newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
+
+    newConf.setQueues(B, new String[]{"b1"});
+    newConf.setCapacity(B1, 100);
+    newConf.setUserLimitFactor(B1, 2.0f);
+
+    MockRM rm = new MockRM(newConf);
+    rm.start();
+
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm.getResourceScheduler();
+
+    MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
+
+    // submit an app
+    MockRMAppSubmissionData data3 =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("test-move-1")
+            .withUser("u1")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app = MockRMAppSubmitter.submit(rm, data3);
+    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    MockRMAppSubmissionData data2 =
+        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
+            .withAppName("app")
+            .withUser("u2")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app2 = MockRMAppSubmitter.submit(rm, data2);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+    MockRMAppSubmissionData data1 =
+        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
+            .withAppName("app")
+            .withUser("u3")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app3 = MockRMAppSubmitter.submit(rm, data1);
+
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
+            .withAppName("app")
+            .withUser("u4")
+            .withAcls(null)
+            .withQueue("a1")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app4 = MockRMAppSubmitter.submit(rm, data);
+
+    // Each application asks 50 * 1GB containers
+    am1.allocate("*", 1 * GB, 50, null);
+    am2.allocate("*", 1 * GB, 50, null);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // check preconditions
+    assertApps(scheduler, "root",
+        app3.getCurrentAppAttempt().getAppAttemptId(),
+        app4.getCurrentAppAttempt().getAppAttemptId(),
+        appAttemptId,
+        app2.getCurrentAppAttempt().getAppAttemptId());
+    assertApps(scheduler, "a",
+        app3.getCurrentAppAttempt().getAppAttemptId(),
+        app4.getCurrentAppAttempt().getAppAttemptId(),
+        appAttemptId,
+        app2.getCurrentAppAttempt().getAppAttemptId());
+    assertApps(scheduler, "a1",
+        app3.getCurrentAppAttempt().getAppAttemptId(),
+        app4.getCurrentAppAttempt().getAppAttemptId(),
+        appAttemptId,
+        app2.getCurrentAppAttempt().getAppAttemptId());
+    assertApps(scheduler, "b");
+    assertApps(scheduler, "b1");
+
+    UsersManager um =
+        (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
+
+    assertEquals(4, um.getNumActiveUsers());
+    assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+
+    // now move the app
+    scheduler.moveAllApps("a1", "b1");
+
+    //Triggering this event so that user limit computation can
+    //happen again
+    for (int i = 0; i < 10; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      Thread.sleep(500);
+    }
+
+    // check post conditions
+    assertApps(scheduler, "root",
+        appAttemptId,
+        app2.getCurrentAppAttempt().getAppAttemptId(),
+        app3.getCurrentAppAttempt().getAppAttemptId(),
+        app4.getCurrentAppAttempt().getAppAttemptId());
+    assertApps(scheduler, "a");
+    assertApps(scheduler, "a1");
+    assertApps(scheduler, "b",
+        appAttemptId,
+        app2.getCurrentAppAttempt().getAppAttemptId(),
+        app3.getCurrentAppAttempt().getAppAttemptId(),
+        app4.getCurrentAppAttempt().getAppAttemptId());
+    assertApps(scheduler, "b1",
+        appAttemptId,
+        app2.getCurrentAppAttempt().getAppAttemptId(),
+        app3.getCurrentAppAttempt().getAppAttemptId(),
+        app4.getCurrentAppAttempt().getAppAttemptId());
+
+    UsersManager umB1 =
+        (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
+
+    assertEquals(2, umB1.getNumActiveUsers());
+    assertEquals(2, umB1.getNumActiveUsersWithOnlyPendingApps());
+
+    rm.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testMoveAttemptNotAdded() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(getCapacityConfiguration(conf));
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+
+    RMAppAttemptMetrics attemptMetric =
+        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
+    RMAppImpl app = mock(RMAppImpl.class);
+    when(app.getApplicationId()).thenReturn(appId);
+    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
+    Container container = mock(Container.class);
+    when(attempt.getMasterContainer()).thenReturn(container);
+    ApplicationSubmissionContext submissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
+    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
+    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
+    when(app.getCurrentAppAttempt()).thenReturn(attempt);
+
+    rm.getRMContext().getRMApps().put(appId, app);
+
+    SchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, "a1", "user");
+    try {
+      cs.moveApplication(appId, "b1");
+      fail("Move should throw exception app not available");
+    } catch (YarnException e) {
+      assertEquals("App to be moved application_100_0001 not found.",
+          e.getMessage());
+    }
+    cs.handle(addAppEvent);
+    cs.moveApplication(appId, "b1");
+    SchedulerEvent addAttemptEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
+    cs.handle(addAttemptEvent);
+    CSQueue rootQ = cs.getRootQueue();
+    CSQueue queueB = cs.getQueue("b");
+    CSQueue queueA = cs.getQueue("a");
+    CSQueue queueA1 = cs.getQueue("a1");
+    CSQueue queueB1 = cs.getQueue("b1");
+    Assert.assertEquals(1, rootQ.getNumApplications());
+    Assert.assertEquals(0, queueA.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB1.getNumApplications());
+
+    rm.close();
+  }
+
+  @Test
+  public void testRemoveAttemptMoveAdded() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        CapacityScheduler.class);
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    // Create Mock RM
+    MockRM rm = new MockRM(getCapacityConfiguration(conf));
+    CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler();
+    // add node
+    Resource newResource = Resource.newInstance(4 * GB, 1);
+    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
+    SchedulerEvent addNode = new NodeAddedSchedulerEvent(node);
+    sch.handle(addNode);
+
+    ApplicationAttemptId appAttemptId = appHelper(rm, sch, 100, 1, "a1", "user");
+
+    // get Queues
+    CSQueue queueA1 = sch.getQueue("a1");
+    CSQueue queueB = sch.getQueue("b");
+    CSQueue queueB1 = sch.getQueue("b1");
+
+    // add Running rm container and simulate live containers to a1
+    ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2);
+    RMContainerImpl rmContainer = mock(RMContainerImpl.class);
+    when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING);
+    Container container2 = mock(Container.class);
+    when(rmContainer.getContainer()).thenReturn(container2);
+    Resource resource = Resource.newInstance(1024, 1);
+    when(container2.getResource()).thenReturn(resource);
+    when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
+    when(container2.getNodeId()).thenReturn(node.getNodeID());
+    when(container2.getId()).thenReturn(newContainerId);
+    when(rmContainer.getNodeLabelExpression())
+        .thenReturn(RMNodeLabelsManager.NO_LABEL);
+    when(rmContainer.getContainerId()).thenReturn(newContainerId);
+    sch.getApplicationAttempt(appAttemptId).getLiveContainersMap()
+        .put(newContainerId, rmContainer);
+    QueueMetrics queueA1M = queueA1.getMetrics();
+    queueA1M.incrPendingResources(rmContainer.getNodeLabelExpression(),
+        "user1", 1, resource);
+    queueA1M.allocateResources(rmContainer.getNodeLabelExpression(),
+        "user1", resource);
+    // remove attempt
+    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId,
+        RMAppAttemptState.KILLED, true));
+    // Move application to queue b1
+    sch.moveApplication(appAttemptId.getApplicationId(), "b1");
+    // Check queue metrics after move
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(0, queueB1.getNumApplications());
+
+    // Release attempt add event
+    ApplicationAttemptId appAttemptId2 =
+        BuilderUtils.newApplicationAttemptId(appAttemptId.getApplicationId(), 2);
+    SchedulerEvent addAttemptEvent2 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId2, true);
+    sch.handle(addAttemptEvent2);
+
+    // Check metrics after attempt added
+    Assert.assertEquals(0, queueA1.getNumApplications());
+    Assert.assertEquals(1, queueB.getNumApplications());
+    Assert.assertEquals(1, queueB1.getNumApplications());
+
+
+    QueueMetrics queueB1M = queueB1.getMetrics();
+    QueueMetrics queueBM = queueB.getMetrics();
+    // Verify allocation MB of current state
+    Assert.assertEquals(0, queueA1M.getAllocatedMB());
+    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+    Assert.assertEquals(1024, queueB1M.getAllocatedMB());
+    Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores());
+
+    // remove attempt
+    sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2,
+        RMAppAttemptState.FINISHED, false));
+
+    Assert.assertEquals(0, queueA1M.getAllocatedMB());
+    Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores());
+    Assert.assertEquals(0, queueB1M.getAllocatedMB());
+    Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores());
+
+    verifyQueueMetrics(queueB1M);
+    verifyQueueMetrics(queueBM);
+    // Verify queue A1 metrics
+    verifyQueueMetrics(queueA1M);
+    rm.close();
+  }
+
+  @Test
+  public void testAppSubmission() throws Exception {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setQueues(A, new String[]{"a1", "a2", "b"});
+    conf.setCapacity(A1, 20);
+    conf.setCapacity("root.a.b", 10);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    RMApp noParentQueueApp = submitAppAndWaitForState(rm, "q", RMAppState.FAILED);
+    Assert.assertEquals(RMAppState.FAILED, noParentQueueApp.getState());
+
+    RMApp ambiguousQueueApp = submitAppAndWaitForState(rm, "b", RMAppState.FAILED);
+    Assert.assertEquals(RMAppState.FAILED, ambiguousQueueApp.getState());
+
+    RMApp emptyPartQueueApp = submitAppAndWaitForState(rm, "root..a1", RMAppState.FAILED);
+    Assert.assertEquals(RMAppState.FAILED, emptyPartQueueApp.getState());
+
+    RMApp failedAutoQueue = submitAppAndWaitForState(rm, "root.a.b.c.d", RMAppState.FAILED);
+    Assert.assertEquals(RMAppState.FAILED, failedAutoQueue.getState());
+  }
+
+  private RMApp submitAppAndWaitForState(MockRM rm, String b, RMAppState state) throws Exception {
+    MockRMAppSubmissionData ambiguousQueueAppData =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withWaitForAppAcceptedState(false)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue(b)
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app1 = MockRMAppSubmitter.submit(rm, ambiguousQueueAppData);
+    rm.waitForState(app1.getApplicationId(), state);
+    return app1;
+  }
+
+  private int getNumAppsInQueue(String name, List<CSQueue> queues) {
+    for (CSQueue queue : queues) {
+      if (queue.getQueueShortName().equals(name)) {
+        return queue.getNumApplications();
+      }
+    }
+    return -1;
+  }
+
+  private CapacitySchedulerQueueInfo getQueueInfo(String name,
+                                                  CapacitySchedulerQueueInfoList info) {
+    if (info != null) {
+      for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
+        if (queueInfo.getQueueName().equals(name)) {
+          return queueInfo;
+        } else {
+          CapacitySchedulerQueueInfo result =
+              getQueueInfo(name, queueInfo.getQueues());
+          if (result == null) {
+            continue;
+          }
+          return result;
+        }
+      }
+    }
+    return null;
+  }
+
+  private void verifyQueueMetrics(QueueMetrics queue) {
+    Assert.assertEquals(0, queue.getPendingMB());
+    Assert.assertEquals(0, queue.getActiveUsers());
+    Assert.assertEquals(0, queue.getActiveApps());
+    Assert.assertEquals(0, queue.getAppsPending());
+    Assert.assertEquals(0, queue.getAppsRunning());
+    Assert.assertEquals(0, queue.getAllocatedMB());
+    Assert.assertEquals(0, queue.getAllocatedVirtualCores());
+  }
+
+  private Configuration getCapacityConfiguration(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[]{"a", "b"});
+    conf.setCapacity(A, 50);
+    conf.setCapacity(B, 50);
+    conf.setQueues(A, new String[]{"a1", "a2"});
+    conf.setCapacity(A1, 50);
+    conf.setCapacity(A2, 50);
+    conf.setQueues(B, new String[]{"b1"});
+    conf.setCapacity(B1, 100);
+    return conf;
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodes.java
new file mode 100644
index 0000000..c557354
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodes.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.NULL_UPDATE_REQUESTS;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerNodes {
+
+  private ResourceManager resourceManager = null;
+
+  @Before
+  public void setUp() throws Exception {
+    resourceManager = createResourceManager();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stopResourceManager(resourceManager);
+  }
+
+  @Test
+  public void testReconnectedNode() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(csConf);
+    cs.start();
+    cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
+        null, null, new RMContainerTokenSecretManager(csConf),
+        new NMTokenSecretManagerInRM(csConf),
+        new ClientToAMTokenSecretManagerInRM(), null));
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+
+    Assert.assertEquals(6 * GB, cs.getClusterResource().getMemorySize());
+
+    // reconnect n1 with downgraded memory
+    n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+
+    Assert.assertEquals(4 * GB, cs.getClusterResource().getMemorySize());
+    cs.stop();
+  }
+
+  @Test
+  public void testBlackListNodes() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    String host = "127.0.0.1";
+    RMNode node =
+        MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
+    cs.handle(new NodeAddedSchedulerEvent(node));
+
+    ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user");
+
+    // Verify the blacklist can be updated independent of requesting containers
+    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
+        Collections.<ContainerId>emptyList(),
+        Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
+    Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
+        .isPlaceBlacklisted(host));
+    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
+        Collections.<ContainerId>emptyList(), null,
+        Collections.singletonList(host), NULL_UPDATE_REQUESTS);
+    Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
+        .isPlaceBlacklisted(host));
+    rm.stop();
+  }
+
+  @Test
+  public void testNumClusterNodes() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(conf);
+    RMContext rmContext = TestUtils.getMockRMContext();
+    cs.setRMContext(rmContext);
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    cs.init(csConf);
+    cs.start();
+    assertEquals(0, cs.getNumClusterNodes());
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+    assertEquals(2, cs.getNumClusterNodes());
+
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(1, cs.getNumClusterNodes());
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    assertEquals(2, cs.getNumClusterNodes());
+    cs.handle(new NodeRemovedSchedulerEvent(n2));
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(0, cs.getNumClusterNodes());
+
+    cs.stop();
+  }
+
+  @Test
+  public void testDefaultNodeLabelExpressionQueueConfig() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.setDefaultNodeLabelExpression("root.a", " x");
+    conf.setDefaultNodeLabelExpression("root.b", " y ");
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+
+    QueueInfo queueInfoA = cs.getQueueInfo("a", true, false);
+    Assert.assertEquals("Queue Name should be a", "a",
+        queueInfoA.getQueueName());
+    Assert.assertEquals("Queue Path should be root.a", "root.a",
+        queueInfoA.getQueuePath());
+    Assert.assertEquals("Default Node Label Expression should be x", "x",
+        queueInfoA.getDefaultNodeLabelExpression());
+
+    QueueInfo queueInfoB = cs.getQueueInfo("b", true, false);
+    Assert.assertEquals("Queue Name should be b", "b",
+        queueInfoB.getQueueName());
+    Assert.assertEquals("Queue Path should be root.b", "root.b",
+        queueInfoB.getQueuePath());
+    Assert.assertEquals("Default Node Label Expression should be y", "y",
+        queueInfoB.getDefaultNodeLabelExpression());
+  }
+
+  @Test
+  public void testRemovedNodeDecommissioningNode() throws Exception {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
+    // Register nodemanager
+    NodeManager nm = registerNode(resourceManager, "host_decom", 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
+
+    RMNode node =
+        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+    // force remove the node to simulate race condition
+    ((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
+        removeNode(nm.getNodeId());
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+  }
+
+  @Test
+  public void testResourceUpdateDecommissioningNode() throws Exception {
+    // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
+    // to have 0 available resource
+    RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
+    Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
+    when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler<Event>() {
+      @Override
+      public void handle(Event event) {
+        if (event instanceof RMNodeResourceUpdateEvent) {
+          RMNodeResourceUpdateEvent resourceEvent =
+              (RMNodeResourceUpdateEvent) event;
+          resourceManager
+              .getResourceScheduler()
+              .getSchedulerNode(resourceEvent.getNodeId())
+              .updateTotalResource(resourceEvent.getResourceOption().getResource());
+        }
+      }
+    });
+    Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
+    ((CapacityScheduler) resourceManager.getResourceScheduler())
+        .setRMContext(spyContext);
+    ((AsyncDispatcher) mockDispatcher).start();
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
+    // Register node
+    String host0 = "host_0";
+    NodeManager nm0 = registerNode(resourceManager, host0, 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
+    // ResourceRequest priorities
+    Priority priority0 = Priority.newInstance(0);
+
+    // Submit an application
+    Application application0 =
+        new Application("user_0", "a1", resourceManager);
+    application0.submit();
+
+    application0.addNodeManager(host0, 1234, nm0);
+
+    Resource capability00 = Resources.createResource(1 * GB, 1);
+    application0.addResourceRequestSpec(priority0, capability00);
+
+    Task task00 =
+        new Task(application0, priority0, new String[]{host0});
+    application0.addTask(task00);
+
+    // Send resource requests to the scheduler
+    application0.schedule();
+
+    nodeUpdate(resourceManager, nm0);
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    // This should update the schedulernodes to have 0 available resource
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm0.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+
+    // Get allocations from the scheduler
+    application0.schedule();
+
+    // Check the used resource is 1 GB 1 core
+    Assert.assertEquals(1 * GB, nm0.getUsed().getMemorySize());
+    Resource usedResource =
+        resourceManager.getResourceScheduler()
+            .getSchedulerNode(nm0.getNodeId()).getAllocatedResource();
+    Assert.assertEquals("Used Resource Memory Size should be 1GB", 1 * GB,
+        usedResource.getMemorySize());
+    Assert.assertEquals("Used Resource Virtual Cores should be 1", 1,
+        usedResource.getVirtualCores());
+    // Check total resource of scheduler node is also changed to 1 GB 1 core
+    Resource totalResource =
+        resourceManager.getResourceScheduler()
+            .getSchedulerNode(nm0.getNodeId()).getTotalResource();
+    Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
+        totalResource.getMemorySize());
+    Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,
+        totalResource.getVirtualCores());
+    // Check the available resource is 0/0
+    Resource availableResource =
+        resourceManager.getResourceScheduler()
+            .getSchedulerNode(nm0.getNodeId()).getUnallocatedResource();
+    Assert.assertEquals("Available Resource Memory Size should be 0", 0,
+        availableResource.getMemorySize());
+    Assert.assertEquals("Available Resource Memory Size should be 0", 0,
+        availableResource.getVirtualCores());
+    // Kick off another heartbeat where the RMNodeResourceUpdateEvent would
+    // be skipped for DECOMMISSIONING state since the total resource is
+    // already equal to used resource from the previous heartbeat.
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+    verify(mockDispatcher, times(4)).getEventHandler();
+  }
+
+  @Test
+  public void testSchedulingOnRemovedNode() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
+        false);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    RMApp app = MockRMAppSubmitter.submitWithMemory(100, rm);
+    rm.drainEvents();
+
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+    //remove nm2 to keep am alive
+    MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
+
+    am.allocate(ResourceRequest.ANY, 2048, 1, null);
+
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    FiCaSchedulerNode node =
+        (FiCaSchedulerNode)
+            scheduler.getNodeTracker().getNode(nm2.getNodeId());
+    scheduler.handle(new NodeRemovedSchedulerEvent(
+        rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
+    // schedulerNode is removed, try allocate a container
+    scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
+        true);
+
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
+        new AppAttemptRemovedSchedulerEvent(
+            am.getApplicationAttemptId(),
+            RMAppAttemptState.FINISHED, false);
+    scheduler.handle(appRemovedEvent1);
+    rm.stop();
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java
new file mode 100644
index 0000000..fc18700
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java
@@ -0,0 +1,888 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocation;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.unsetMaxAllocation;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ExpectedCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfWithoutChildrenOfB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithB1AsParentQueue;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB1;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerQueues {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestCapacitySchedulerQueues.class);
+  private ResourceManager resourceManager = null;
+  private RMContext mockContext;
+
+  @Before
+  public void setUp() throws Exception {
+    resourceManager = createResourceManager();
+    mockContext = createMockRMContext();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stopResourceManager(resourceManager);
+  }
+
+  /**
+   * Test that parseQueue throws an exception when two leaf queues have the
+   * same name.
+   *
+   * @throws IOException
+   */
+  @Test(expected = IOException.class)
+  public void testParseQueue() throws IOException {
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.init(conf);
+    cs.start();
+
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[]{"b1"});
+    conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
+    conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
+
+    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null));
+  }
+
+  @Test
+  public void testRefreshQueues() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
+    checkQueueStructureCapacities(cs);
+
+    conf.setCapacity(A, 80f);
+    conf.setCapacity(B, 20f);
+    cs.reinitialize(conf, mockContext);
+    checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f));
+    cs.stop();
+  }
+
+  @Test
+  public void testRefreshQueuesWithNewQueue() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null));
+    checkQueueStructureCapacities(cs);
+
+    // Add a new queue b4
+    final String b4 = B + ".b4";
+    final float b4Capacity = 10;
+    final float modifiedB3Capacity = B3_CAPACITY - b4Capacity;
+
+    try {
+      conf.setCapacity(A, 80f);
+      conf.setCapacity(B, 20f);
+      conf.setQueues(B, new String[]{"b1", "b2", "b3", "b4"});
+      conf.setCapacity(B1, B1_CAPACITY);
+      conf.setCapacity(B2, B2_CAPACITY);
+      conf.setCapacity(B3, modifiedB3Capacity);
+      conf.setCapacity(b4, b4Capacity);
+      cs.reinitialize(conf, mockContext);
+
+      final float capA = 80f / 100.0f;
+      final float capB = 20f / 100.0f;
+      Map<String, ExpectedCapacities> expectedCapacities =
+          getDefaultCapacities(capA, capB);
+      expectedCapacities.put(B3,
+          new ExpectedCapacities(modifiedB3Capacity / 100.0f, capB));
+      expectedCapacities.put(b4, new ExpectedCapacities(b4Capacity / 100.0f, capB));
+      checkQueueStructureCapacities(cs, expectedCapacities);
+
+      // Verify parent for B4
+      CSQueue rootQueue = cs.getRootQueue();
+      CSQueue queueB = findQueue(rootQueue, B);
+      CSQueue queueB4 = findQueue(queueB, b4);
+
+      assertEquals(queueB, queueB4.getParent());
+    } finally {
+      cs.stop();
+    }
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
+    // queue refresh should not allow changing the maximum allocation setting
+    // per queue to be smaller than previous setting
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueStructureCapacities(cs);
+
+    assertEquals("max allocation in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max allocation for A1",
+        Resources.none(),
+        conf.getQueueMaximumAllocation(A1));
+    assertEquals("max allocation",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
+
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueA1 = findQueue(queueA, A1);
+    assertEquals("queue max allocation", ((LeafQueue) queueA1)
+        .getMaximumAllocation().getMemorySize(), 8192);
+
+    setMaxAllocMb(conf, A1, 4096);
+
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+
+    setMaxAllocMb(conf, A1, 8192);
+    cs.reinitialize(conf, mockContext);
+
+    setMaxAllocVcores(conf, A1,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
+    // verify we can't set the allocation per queue larger then cluster setting
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.init(conf);
+    cs.start();
+    // change max allocation for B3 queue to be larger then cluster max
+    setMaxAllocMb(conf, B3,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("maximum allocation exception",
+          e.getCause().getMessage().contains("maximum allocation"));
+    }
+
+    setMaxAllocMb(conf, B3,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    cs.reinitialize(conf, mockContext);
+
+    setMaxAllocVcores(conf, B3,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("maximum allocation exception",
+          e.getCause().getMessage().contains("maximum allocation"));
+    }
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
+    // queue refresh should allow max allocation per queue to go larger
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    setMaxAllocVcores(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    setMaxAllocMb(conf, A1, 4096);
+    setMaxAllocVcores(conf, A1, 2);
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueStructureCapacities(cs);
+
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueA1 = findQueue(queueA, A1);
+
+    assertEquals("max capability MB in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max capability vcores in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        cs.getMaximumResourceCapability().getVirtualCores());
+    assertEquals("max allocation MB A1",
+        4096,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores A1",
+        2,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("cluster max allocation MB",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
+    assertEquals("cluster max allocation vcores",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
+
+    assertEquals("queue max allocation", 4096,
+        queueA1.getMaximumAllocation().getMemorySize());
+
+    setMaxAllocMb(conf, A1, 6144);
+    setMaxAllocVcores(conf, A1, 3);
+    cs.reinitialize(conf, null);
+    // conf will have changed but we shouldn't be able to change max allocation
+    // for the actual queue
+    assertEquals("max allocation MB A1", 6144,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores A1", 3,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("max allocation MB cluster",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
+    assertEquals("max allocation vcores cluster",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
+    assertEquals("queue max allocation MB", 6144,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("queue max allocation vcores", 3,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("max capability MB cluster",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("cluster max capability vcores",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        cs.getMaximumResourceCapability().getVirtualCores());
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationCSError() throws Exception {
+    // Try to refresh the cluster level max allocation size to be smaller
+    // and it should error out
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf, 10240);
+    setMaxAllocVcores(conf, 10);
+    setMaxAllocMb(conf, A1, 4096);
+    setMaxAllocVcores(conf, A1, 4);
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueStructureCapacities(cs);
+
+    assertEquals("max allocation MB in CS", 10240,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max allocation vcores in CS", 10,
+        cs.getMaximumResourceCapability().getVirtualCores());
+
+    setMaxAllocMb(conf, 6144);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+
+    setMaxAllocMb(conf, 10240);
+    cs.reinitialize(conf, mockContext);
+
+    setMaxAllocVcores(conf, 8);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("should have thrown exception");
+    } catch (IOException e) {
+      assertTrue("max allocation exception",
+          e.getCause().toString().contains("not be decreased"));
+    }
+  }
+
+  @Test
+  public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
+    // Try to refresh the cluster level max allocation size to be larger
+    // and verify that if there is no setting per queue it uses the
+    // cluster level setting.
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf, 10240);
+    setMaxAllocVcores(conf, 10);
+    setMaxAllocMb(conf, A1, 4096);
+    setMaxAllocVcores(conf, A1, 4);
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueStructureCapacities(cs);
+
+    assertEquals("max allocation MB in CS", 10240,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max allocation vcores in CS", 10,
+        cs.getMaximumResourceCapability().getVirtualCores());
+
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueB = findQueue(rootQueue, B);
+    CSQueue queueA1 = findQueue(queueA, A1);
+    CSQueue queueA2 = findQueue(queueA, A2);
+    CSQueue queueB2 = findQueue(queueB, B2);
+
+    assertEquals("queue A1 max allocation MB", 4096,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("queue A1 max allocation vcores", 4,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("queue A2 max allocation MB", 10240,
+        queueA2.getMaximumAllocation().getMemorySize());
+    assertEquals("queue A2 max allocation vcores", 10,
+        queueA2.getMaximumAllocation().getVirtualCores());
+    assertEquals("queue B2 max allocation MB", 10240,
+        queueB2.getMaximumAllocation().getMemorySize());
+    assertEquals("queue B2 max allocation vcores", 10,
+        queueB2.getMaximumAllocation().getVirtualCores());
+
+    setMaxAllocMb(conf, 12288);
+    setMaxAllocVcores(conf, 12);
+    cs.reinitialize(conf, null);
+    // cluster level setting should change and any queues without
+    // per queue setting
+    assertEquals("max allocation MB in CS", 12288,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max allocation vcores in CS", 12,
+        cs.getMaximumResourceCapability().getVirtualCores());
+    assertEquals("queue A1 max MB allocation", 4096,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("queue A1 max vcores allocation", 4,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("queue A2 max MB allocation", 12288,
+        queueA2.getMaximumAllocation().getMemorySize());
+    assertEquals("queue A2 max vcores allocation", 12,
+        queueA2.getMaximumAllocation().getVirtualCores());
+    assertEquals("queue B2 max MB allocation", 12288,
+        queueB2.getMaximumAllocation().getMemorySize());
+    assertEquals("queue B2 max vcores allocation", 12,
+        queueB2.getMaximumAllocation().getVirtualCores());
+  }
+
+  /**
+   * Test for queue deletion.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRefreshQueuesWithQueueDelete() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
+    checkQueueStructureCapacities(cs);
+
+    // test delete leaf queue when there is application running.
+    Map<String, CSQueue> queues =
+        cs.getCapacitySchedulerQueueManager().getShortNameQueues();
+    String b1QTobeDeleted = "b1";
+    LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted));
+    when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
+        .thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue(b1QTobeDeleted, csB1Queue);
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithoutB1(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to delete a"
+          + " queue with running apps");
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // test delete leaf queue(root.b.b1) when there is no application running.
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithoutB1(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      LOG.error(
+          "Expected to NOT throw exception when refresh queue tries to delete"
+              + " a queue WITHOUT running apps",
+          e);
+      fail("Expected to NOT throw exception when refresh queue tries to delete"
+          + " a queue WITHOUT running apps");
+    }
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueB = findQueue(rootQueue, B);
+    CSQueue queueB3 = findQueue(queueB, B1);
+    assertNull("Refresh needs to support delete of leaf queue ", queueB3);
+
+    // reset back to default configuration for testing parent queue delete
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.reinitialize(conf, rmContext);
+    checkQueueStructureCapacities(cs);
+
+    // set the configurations such that it fails once but should be successfull
+    // next time
+    queues = cs.getCapacitySchedulerQueueManager().getShortNameQueues();
+    CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b"));
+    when(bQueue.getState()).thenReturn(QueueState.DRAINING)
+        .thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue("b", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
+
+    // test delete Parent queue when there is application running.
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithoutB(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to delete a"
+          + " parent queue with running apps in children queue");
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // test delete Parent queue when there is no application running.
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithoutB(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      fail("Expected to not throw exception when refresh queue tries to delete"
+          + " a queue without running apps");
+    }
+    rootQueue = cs.getRootQueue();
+    queueB = findQueue(rootQueue, B);
+    String message =
+        "Refresh needs to support delete of Parent queue and its children.";
+    assertNull(message, queueB);
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
+
+    cs.stop();
+  }
+
+  /**
+   * Test for all child queue deletion and thus making parent queue a child.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
+    checkQueueStructureCapacities(cs);
+
+    // test delete all leaf queues when there is no application running.
+    Map<String, CSQueue> queues =
+        cs.getCapacitySchedulerQueueManager().getShortNameQueues();
+
+    CSQueue bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
+    when(bQueue.getState()).thenReturn(QueueState.RUNNING)
+        .thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
+
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfWithoutChildrenOfB(conf);
+
+    // test convert parent queue to leaf queue(root.b) when there is no
+    // application running.
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to make parent"
+          + " queue a child queue when one of its children is still running.");
+    } catch (IOException e) {
+      //do not do anything, expected exception
+    }
+
+    // test delete leaf queues(root.b.b1,b2,b3) when there is no application
+    // running.
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Expected to NOT throw exception when refresh queue tries to delete"
+          + " all children of a parent queue(without running apps).");
+    }
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueB = findQueue(rootQueue, B);
+    assertNotNull("Parent Queue B should not be deleted", queueB);
+    Assert.assertTrue("As Queue'B children are not deleted",
+        queueB instanceof LeafQueue);
+
+    String message =
+        "Refresh needs to support delete of all children of Parent queue.";
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b3"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
+
+    cs.stop();
+  }
+
+  /**
+   * Test if we can convert a leaf queue to a parent queue.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 10000)
+  public void testConvertLeafQueueToParentQueue() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
+    checkQueueStructureCapacities(cs);
+
+    String targetQueue = "b1";
+    CSQueue b1 = cs.getQueue(targetQueue);
+    Assert.assertEquals(QueueState.RUNNING, b1.getState());
+
+    // test if we can convert a leaf queue which is in RUNNING state
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithB1AsParentQueue(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to convert"
+          + " a child queue to a parent queue.");
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // now set queue state for b1 to STOPPED
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
+    cs.reinitialize(conf, mockContext);
+    Assert.assertEquals(QueueState.STOPPED, b1.getState());
+
+    // test if we can convert a leaf queue which is in STOPPED state
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithB1AsParentQueue(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      fail("Expected to NOT throw exception when refresh queue tries"
+          + " to convert a leaf queue WITHOUT running apps");
+    }
+    b1 = cs.getQueue(targetQueue);
+    Assert.assertTrue(b1 instanceof ParentQueue);
+    Assert.assertEquals(QueueState.RUNNING, b1.getState());
+    Assert.assertTrue(!b1.getChildQueues().isEmpty());
+  }
+
+  @Test
+  public void testQueuesMaxAllocationInheritance() throws Exception {
+    // queue level max allocation is set by the queue configuration explicitly
+    // or inherits from the parent.
+
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    setMaxAllocVcores(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+    // Test the child queue overrides
+    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+        "memory-mb=4096,vcores=2");
+    setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
+    setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
+    setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");
+
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueStructureCapacities(cs);
+
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueB = findQueue(rootQueue, B);
+    CSQueue queueA1 = findQueue(queueA, A1);
+    CSQueue queueA2 = findQueue(queueA, A2);
+    CSQueue queueB1 = findQueue(queueB, B1);
+    CSQueue queueB2 = findQueue(queueB, B2);
+
+    assertEquals("max capability MB in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max capability vcores in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        cs.getMaximumResourceCapability().getVirtualCores());
+    assertEquals("max allocation MB A1",
+        6144,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores A1",
+        2,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("max allocation MB A2", 4096,
+        queueA2.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores A2",
+        2,
+        queueA2.getMaximumAllocation().getVirtualCores());
+    assertEquals("max allocation MB B", 5120,
+        queueB.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation MB B1", 5120,
+        queueB1.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation MB B2", 1024,
+        queueB2.getMaximumAllocation().getMemorySize());
+
+    // Test get the max-allocation from different parent
+    unsetMaxAllocation(conf, A1);
+    unsetMaxAllocation(conf, B);
+    unsetMaxAllocation(conf, B1);
+    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+        "memory-mb=6144,vcores=2");
+    setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");
+
+    cs.reinitialize(conf, mockContext);
+
+    assertEquals("max capability MB in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max capability vcores in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        cs.getMaximumResourceCapability().getVirtualCores());
+    assertEquals("max allocation MB A1",
+        8192,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores A1",
+        2,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("max allocation MB B1",
+        6144,
+        queueB1.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores B1",
+        2,
+        queueB1.getMaximumAllocation().getVirtualCores());
+
+    // Test the default
+    unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT);
+    unsetMaxAllocation(conf, A);
+    unsetMaxAllocation(conf, A1);
+    cs.reinitialize(conf, mockContext);
+
+    assertEquals("max capability MB in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        cs.getMaximumResourceCapability().getMemorySize());
+    assertEquals("max capability vcores in CS",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        cs.getMaximumResourceCapability().getVirtualCores());
+    assertEquals("max allocation MB A1",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        queueA1.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores A1",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        queueA1.getMaximumAllocation().getVirtualCores());
+    assertEquals("max allocation MB A2",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        queueA2.getMaximumAllocation().getMemorySize());
+    assertEquals("max allocation vcores A2",
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+        queueA2.getMaximumAllocation().getVirtualCores());
+  }
+
+  @Test
+  public void testVerifyQueuesMaxAllocationConf() throws Exception {
+    // queue level max allocation can't exceed the cluster setting
+
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    setMaxAllocMb(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    setMaxAllocVcores(conf,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+
+    long largerMem =
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024;
+    long largerVcores =
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 10;
+
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, mockContext);
+    checkQueueStructureCapacities(cs);
+
+    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+        "memory-mb=" + largerMem + ",vcores=2");
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Queue Root maximum allocation can't exceed the cluster setting");
+    } catch (Exception e) {
+      assertTrue("maximum allocation exception",
+          e.getCause().getMessage().contains("maximum allocation"));
+    }
+
+    setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
+        "memory-mb=4096,vcores=2");
+    setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
+    setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Queue A1 maximum allocation can't exceed the cluster setting");
+    } catch (Exception e) {
+      assertTrue("maximum allocation exception",
+          e.getCause().getMessage().contains("maximum allocation"));
+    }
+    setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Queue A1 maximum allocation can't exceed the cluster setting");
+    } catch (Exception e) {
+      assertTrue("maximum allocation exception",
+          e.getCause().getMessage().contains("maximum allocation"));
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org