You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/09 05:09:22 UTC

svn commit: r1601303 [2/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-res...

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1601303&r1=1601302&r2=1601303&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Mon Jun  9 03:09:21 2014
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -56,11 +57,10 @@ import org.apache.hadoop.yarn.util.resou
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 public class TestFifoScheduler {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -298,7 +298,10 @@ public class TestFifoScheduler {
     FifoScheduler fs = new FifoScheduler();
     fs.init(conf);
     fs.start();
+    // mock rmContext to avoid NPE.
+    RMContext context = mock(RMContext.class);
     fs.reinitialize(conf, null);
+    fs.setRMContext(context);
 
     RMNode n1 =
         MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java?rev=1601303&r1=1601302&r2=1601303&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java Mon Jun  9 03:09:21 2014
@@ -43,10 +43,11 @@ import org.junit.Test;
 public class TestMoveApplication {
   private ResourceManager resourceManager = null;
   private static boolean failMove;
-  
+  private Configuration conf;
+
   @Before
   public void setUp() throws Exception {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
         FifoSchedulerWithMove.class);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public class TestMoveApplication {
     }
   }
   
-  @Test (timeout = 5000)
-  public void testMoveSuccessful() throws Exception {
-    // Submit application
-    Application application = new Application("user1", resourceManager);
-    ApplicationId appId = application.getApplicationId();
-    application.submit();
-    
-    // Wait for app to be accepted
-    RMApp app = resourceManager.rmContext.getRMApps().get(appId);
-    while (app.getState() != RMAppState.ACCEPTED) {
-      Thread.sleep(100);
-    }
-
-    ClientRMService clientRMService = resourceManager.getClientRMService();
+  @Test (timeout = 10000)
+      public
+      void testMoveSuccessful() throws Exception {
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    RMApp app = rm1.submitApp(1024);
+    ClientRMService clientRMService = rm1.getClientRMService();
     // FIFO scheduler does not support moves
-    clientRMService.moveApplicationAcrossQueues(
-        MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-    
-    RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+    clientRMService
+      .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+        .newInstance(app.getApplicationId(), "newqueue"));
+
+    RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
     assertEquals("newqueue", rmApp.getQueue());
+    rm1.stop();
   }
-  
+
   @Test
   public void testMoveRejectedByPermissions() throws Exception {
     failMove = true;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1601303&r1=1601302&r2=1601303&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Jun  9 03:09:21 2014
@@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testExpiredContainer() {
     // Start the node
-    node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
     
     // Expire a container
@@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testContainerUpdate() throws InterruptedException{
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     
     NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
     RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
-    node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node2.handle(new RMNodeStartedEvent(null, null));
     
     ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
         BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testStatusChange(){
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     //Add info to the queue first
     node.setNextHeartBeat(false);
 
@@ -464,7 +465,7 @@ public class TestRMNodeTransitions {
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
         null, ResourceOption.newInstance(capability,
             RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
   }
@@ -495,7 +496,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
     Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1601303&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Mon Jun  9 03:09:21 2014
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+@RunWith(value = Parameterized.class)
+public class TestWorkPreservingRMRestart {
+
+  private YarnConfiguration conf;
+  private Class<?> schedulerClass;
+  MockRM rm1 = null;
+  MockRM rm2 = null;
+
+  @Before
+  public void setup() throws UnknownHostException {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    conf = new YarnConfiguration();
+    UserGroupInformation.setConfiguration(conf);
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+
+  @After
+  public void tearDown() {
+    if (rm1 != null) {
+      rm1.stop();
+    }
+    if (rm2 != null) {
+      rm2.stop();
+    }
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> getTestParameters() {
+    return Arrays.asList(new Object[][] { { CapacityScheduler.class },
+        { FifoScheduler.class } });
+  }
+
+  public TestWorkPreservingRMRestart(Class<?> schedulerClass) {
+    this.schedulerClass = schedulerClass;
+  }
+
+  // Test common scheduler state including SchedulerAttempt, SchedulerNode,
+  // AppSchedulingInfo can be reconstructed via the container recovery reports
+  // on NM re-registration.
+  // Also test scheduler specific changes: i.e. Queue recovery-
+  // CSQueue/FSQueue/FifoQueue recovery respectively.
+  // Test Strategy: send 3 container recovery reports(AMContainer, running
+  // container, completed container) on NM re-registration, check the states of
+  // SchedulerAttempt, SchedulerNode etc. are updated accordingly.
+  @Test(timeout = 20000)
+  public void testSchedulerRecovery() throws Exception {
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+    conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+      DominantResourceCalculator.class.getName());
+
+    int containerMemory = 1024;
+    Resource containerResource = Resource.newInstance(containerMemory, 1);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // clear queue metrics
+    rm1.clearQueueMetrics(app1);
+
+    // Re-start RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    // recover app
+    RMApp recoveredApp1 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+
+    nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+      completedContainer));
+
+    // Wait for RM to settle down on recovering containers;
+    waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+
+    // check RMContainers are re-recreated and the container state is correct.
+    rm2.waitForState(nm1, amContainer.getContainerId(),
+      RMContainerState.RUNNING);
+    rm2.waitForState(nm1, runningContainer.getContainerId(),
+      RMContainerState.RUNNING);
+    rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+    SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
+
+    // ********* check scheduler node state.*******
+    // 2 running containers.
+    Resource usedResources = Resources.multiply(containerResource, 2);
+    Resource nmResource =
+        Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+
+    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidContainer(runningContainer
+      .getContainerId()));
+    assertFalse(schedulerNode1.isValidContainer(completedContainer
+      .getContainerId()));
+    // 2 launched containers, 1 completed container
+    assertEquals(2, schedulerNode1.getNumContainers());
+
+    assertEquals(Resources.subtract(nmResource, usedResources),
+      schedulerNode1.getAvailableResource());
+    assertEquals(usedResources, schedulerNode1.getUsedResource());
+    Resource availableResources = Resources.subtract(nmResource, usedResources);
+
+    // ***** check queue state based on the underlying scheduler ********
+    Map<ApplicationId, SchedulerApplication> schedulerApps =
+        ((AbstractYarnScheduler) rm2.getResourceScheduler())
+          .getSchedulerApplications();
+    SchedulerApplication schedulerApp =
+        schedulerApps.get(recoveredApp1.getApplicationId());
+
+    if (schedulerClass.equals(CapacityScheduler.class)) {
+      checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
+    } else if (schedulerClass.equals(FifoScheduler.class)) {
+      checkFifoQueue(schedulerApp, usedResources, availableResources);
+    }
+
+    // *********** check scheduler attempt state.********
+    SchedulerApplicationAttempt schedulerAttempt =
+        schedulerApp.getCurrentAppAttempt();
+    assertTrue(schedulerAttempt.getLiveContainers().contains(
+      scheduler.getRMContainer(amContainer.getContainerId())));
+    assertTrue(schedulerAttempt.getLiveContainers().contains(
+      scheduler.getRMContainer(runningContainer.getContainerId())));
+    assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
+    assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
+    // *********** check appSchedulingInfo state ***********
+    assertEquals(4, schedulerAttempt.getNewContainerId());
+  }
+
+  private void checkCSQueue(MockRM rm,
+      SchedulerApplication<SchedulerApplicationAttempt> app,
+      Resource clusterResource, Resource queueResource, Resource usedResource,
+      int numContainers)
+      throws Exception {
+    checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource,
+      numContainers);
+
+    LeafQueue queue = (LeafQueue) app.getQueue();
+    Resource availableResources = Resources.subtract(queueResource, usedResource);
+    // ************* check Queue metrics ************
+    QueueMetrics queueMetrics = queue.getMetrics();
+    asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+      availableResources.getVirtualCores(), usedResource.getMemory(),
+      usedResource.getVirtualCores());
+
+    // ************ check user metrics ***********
+    QueueMetrics userMetrics =
+        queueMetrics.getUserMetrics(app.getUser());
+    asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+      availableResources.getVirtualCores(), usedResource.getMemory(),
+      usedResource.getVirtualCores());
+  }
+
+  private void checkCSLeafQueue(MockRM rm,
+      SchedulerApplication<SchedulerApplicationAttempt> app,
+      Resource clusterResource, Resource queueResource, Resource usedResource,
+      int numContainers) {
+    LeafQueue leafQueue = (LeafQueue) app.getQueue();
+    // assert queue used resources.
+    assertEquals(usedResource, leafQueue.getUsedResources());
+    assertEquals(numContainers, leafQueue.getNumContainers());
+
+    ResourceCalculator calc =
+        ((CapacityScheduler) rm.getResourceScheduler()).getResourceCalculator();
+    float usedCapacity =
+        Resources.divide(calc, clusterResource, usedResource, queueResource);
+    // assert queue used capacity
+    assertEquals(usedCapacity, leafQueue.getUsedCapacity(), 1e-8);
+    float absoluteUsedCapacity =
+        Resources.divide(calc, clusterResource, usedResource, clusterResource);
+    // assert queue absolute capacity
+    assertEquals(absoluteUsedCapacity, leafQueue.getAbsoluteUsedCapacity(),
+      1e-8);
+    // assert user consumed resources.
+    assertEquals(usedResource, leafQueue.getUser(app.getUser())
+      .getConsumedResources());
+  }
+
+  private void checkFifoQueue(SchedulerApplication schedulerApp,
+      Resource usedResources, Resource availableResources) throws Exception {
+    FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler();
+    // ************ check cluster used Resources ********
+    assertEquals(usedResources, scheduler.getUsedResource());
+
+    // ************ check app headroom ****************
+    SchedulerApplicationAttempt schedulerAttempt =
+        schedulerApp.getCurrentAppAttempt();
+    assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
+    // ************ check queue metrics ****************
+    QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
+    asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+      availableResources.getVirtualCores(), usedResources.getMemory(),
+      usedResources.getVirtualCores());
+  }
+
+  // create 3 container reports for AM
+  public static List<NMContainerStatus>
+      createNMContainerStatusForApp(MockAM am) {
+    List<NMContainerStatus> list =
+        new ArrayList<NMContainerStatus>();
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+    list.add(amContainer);
+    list.add(runningContainer);
+    list.add(completedContainer);
+    return list;
+  }
+
+  private static final String R = "Default";
+  private static final String A = "QueueA";
+  private static final String B = "QueueB";
+  private static final String USER_1 = "user1";
+  private static final String USER_2 = "user2";
+
+  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
+    final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
+    conf.setCapacity(Q_R, 100);
+    final String Q_A = Q_R + "." + A;
+    final String Q_B = Q_R + "." + B;
+    conf.setQueues(Q_R, new String[] {A, B});
+    conf.setCapacity(Q_A, 50);
+    conf.setCapacity(Q_B, 50);
+    conf.setDouble(CapacitySchedulerConfiguration
+      .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
+  }
+
+  // Test CS recovery with multi-level queues and multi-users:
+  // 1. setup 2 NMs each with 8GB memory;
+  // 2. setup 2 level queues: Default -> (QueueA, QueueB)
+  // 3. User1 submits 2 apps on QueueA
+  // 4. User2 submits 1 app  on QueueB
+  // 5. AM and each container has 1GB memory
+  // 6. Restart RM.
+  // 7. nm1 re-syncs back containers belong to user1
+  // 8. nm2 re-syncs back containers belong to user2.
+  // 9. Assert the parent queue and 2 leaf queues state and the metrics.
+  // 10. Assert each user's consumption inside the queue.
+  @Test (timeout = 30000)
+  public void testCapacitySchedulerRecovery() throws Exception {
+    if (!schedulerClass.equals(CapacityScheduler.class)) {
+      return;
+    }
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+    conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+      DominantResourceCalculator.class.getName());
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfiguration(csConf);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(csConf);
+    rm1 = new MockRM(csConf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    MockNM nm2 =
+        new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode();
+    RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
+    MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+    RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
+    MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
+
+    RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // clear queue metrics
+    rm1.clearQueueMetrics(app1_1);
+    rm1.clearQueueMetrics(app1_2);
+    rm1.clearQueueMetrics(app2);
+
+    // Re-start RM
+    rm2 = new MockRM(csConf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    List<NMContainerStatus> am1_1Containers =
+        createNMContainerStatusForApp(am1_1);
+    List<NMContainerStatus> am1_2Containers =
+        createNMContainerStatusForApp(am1_2);
+    am1_1Containers.addAll(am1_2Containers);
+    nm1.registerNode(am1_1Containers);
+
+    List<NMContainerStatus> am2Containers =
+        createNMContainerStatusForApp(am2);
+    nm2.registerNode(am2Containers);
+
+    // Wait for RM to settle down on recovering containers;
+    waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
+    waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
+    waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
+
+    // Calculate each queue's resource usage.
+    Resource containerResource = Resource.newInstance(1024, 1);
+    Resource nmResource =
+        Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+    Resource clusterResource = Resources.multiply(nmResource, 2);
+    Resource q1Resource = Resources.multiply(clusterResource, 0.5);
+    Resource q2Resource = Resources.multiply(clusterResource, 0.5);
+    Resource q1UsedResource = Resources.multiply(containerResource, 4);
+    Resource q2UsedResource = Resources.multiply(containerResource, 2);
+    Resource totalUsedResource = Resources.add(q1UsedResource, q2UsedResource);
+    Resource q1availableResources =
+        Resources.subtract(q1Resource, q1UsedResource);
+    Resource q2availableResources =
+        Resources.subtract(q2Resource, q2UsedResource);
+    Resource totalAvailableResource =
+        Resources.add(q1availableResources, q2availableResources);
+
+    Map<ApplicationId, SchedulerApplication> schedulerApps =
+        ((AbstractYarnScheduler) rm2.getResourceScheduler())
+          .getSchedulerApplications();
+    SchedulerApplication schedulerApp1_1 =
+        schedulerApps.get(app1_1.getApplicationId());
+
+    // assert queue A state.
+    checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
+      q1UsedResource, 4);
+    QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
+    asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4,
+      q1availableResources.getMemory(), q1availableResources.getVirtualCores(),
+      q1UsedResource.getMemory(), q1UsedResource.getVirtualCores());
+
+    // assert queue B state.
+    SchedulerApplication schedulerApp2 =
+        schedulerApps.get(app2.getApplicationId());
+    checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
+      q2UsedResource, 2);
+    QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
+    asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2,
+      q2availableResources.getMemory(), q2availableResources.getVirtualCores(),
+      q2UsedResource.getMemory(), q2UsedResource.getVirtualCores());
+
+    // assert parent queue state.
+    LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
+    ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
+    checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
+      (float) 6 / 16);
+    asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
+      totalAvailableResource.getMemory(),
+      totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
+      totalUsedResource.getVirtualCores());
+  }
+
+  private void checkParentQueue(ParentQueue parentQueue, int numContainers,
+      Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {
+    assertEquals(numContainers, parentQueue.getNumContainers());
+    assertEquals(usedResource, parentQueue.getUsedResources());
+    assertEquals(UsedCapacity, parentQueue.getUsedCapacity(), 1e-8);
+    assertEquals(absoluteUsedCapacity, parentQueue.getAbsoluteUsedCapacity(), 1e-8);
+  }
+
+  // Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler
+  // should not recover the containers that belong to the failed AM.
+  @Test(timeout = 20000)
+  public void testAMfailedBetweenRMRestart() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+          ContainerState.COMPLETE);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+      completedContainer));
+    rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    // Wait for RM to settle down on recovering containers;
+    Thread.sleep(3000);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+    // Previous AM failed, The failed AM should once again release the
+    // just-recovered containers.
+    assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
+    assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
+  }
+
+  // Apps already completed before RM restart. Restarted RM scheduler should not
+  // recover containers for completed apps.
+  @Test(timeout = 20000)
+  public void testContainersNotRecoveredForCompletedApps() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(runningContainer, completedContainer));
+    RMApp recoveredApp1 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    assertEquals(RMAppState.FINISHED, recoveredApp1.getState());
+
+    // Wait for RM to settle down on recovering containers;
+    Thread.sleep(3000);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+
+    // scheduler should not recover containers for finished apps.
+    assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
+    assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
+  }
+
+  private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
+      int appsPending, int appsRunning, int appsCompleted,
+      int allocatedContainers, int availableMB, int availableVirtualCores,
+      int allocatedMB, int allocatedVirtualCores) {
+    assertEquals(appsSubmitted, qm.getAppsSubmitted());
+    assertEquals(appsPending, qm.getAppsPending());
+    assertEquals(appsRunning, qm.getAppsRunning());
+    assertEquals(appsCompleted, qm.getAppsCompleted());
+    assertEquals(allocatedContainers, qm.getAllocatedContainers());
+    assertEquals(availableMB, qm.getAvailableMB());
+    assertEquals(availableVirtualCores, qm.getAvailableVirtualCores());
+    assertEquals(allocatedMB, qm.getAllocatedMB());
+    assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
+  }
+
+  private void waitForNumContainersToRecover(int num, MockRM rm,
+      ApplicationAttemptId attemptId) throws Exception {
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+    SchedulerApplicationAttempt attempt =
+        scheduler.getApplicationAttempt(attemptId);
+    while (attempt == null) {
+      System.out.println("Wait for scheduler attempt " + attemptId
+          + " to be created");
+      Thread.sleep(200);
+      attempt = scheduler.getApplicationAttempt(attemptId);
+    }
+    while (attempt.getLiveContainers().size() < num) {
+      System.out.println("Wait for " + num + " containers to recover.");
+      Thread.sleep(200);
+    }
+  }
+}