You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [21/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Tue Aug 19 23:49:39 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -75,17 +76,18 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -94,6 +96,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -113,7 +116,7 @@ public class TestRMAppAttemptTransitions
   
   private static final String EMPTY_DIAGNOSTICS = "";
   private static final String RM_WEBAPP_ADDR =
-      WebAppUtils.getResolvedRMWebAppURLWithoutScheme(new Configuration());
+      WebAppUtils.getResolvedRMWebAppURLWithScheme(new Configuration());
   
   private boolean isSecurityEnabled;
   private RMContext rmContext;
@@ -130,7 +133,8 @@ public class TestRMAppAttemptTransitions
   private RMAppAttempt applicationAttempt;
 
   private Configuration conf = new Configuration();
-  private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
+  private AMRMTokenSecretManager amRMTokenManager =
+      spy(new AMRMTokenSecretManager(conf, rmContext));
   private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
       spy(new ClientToAMTokenSecretManagerInRM());
   private NMTokenSecretManagerInRM nmTokenManager =
@@ -221,6 +225,8 @@ public class TestRMAppAttemptTransitions
     amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     amFinishingMonitor = mock(AMLivelinessMonitor.class);
     writer = mock(RMApplicationHistoryWriter.class);
+    MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey();
+    when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData);
     rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
@@ -315,7 +321,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertNull(applicationAttempt.getMasterContainer());
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
     assertNotNull(applicationAttempt.getTrackingUrl());
     assertFalse("N/A".equals(applicationAttempt.getTrackingUrl()));
@@ -331,7 +337,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertNull(applicationAttempt.getMasterContainer());
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
     if (UserGroupInformation.isSecurityEnabled()) {
       verify(clientToAMTokenManager).createMasterKey(
@@ -341,7 +347,6 @@ public class TestRMAppAttemptTransitions
       assertNull(applicationAttempt.createClientToken("some client"));
     }
     assertNull(applicationAttempt.createClientToken(null));
-    assertNotNull(applicationAttempt.getAMRMToken());
     // Check events
     verify(masterService).
         registerAppAttempt(applicationAttempt.getAppAttemptId());
@@ -359,7 +364,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertNull(applicationAttempt.getMasterContainer());
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
     
     // Check events
@@ -385,7 +390,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
     verifyAttemptFinalStateSaved();
@@ -425,7 +430,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertNull(applicationAttempt.getMasterContainer());
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
   }
 
@@ -437,7 +442,6 @@ public class TestRMAppAttemptTransitions
     assertEquals(RMAppAttemptState.ALLOCATED, 
         applicationAttempt.getAppAttemptState());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
-    
     // Check events
     verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
     verify(scheduler, times(2)).
@@ -461,7 +465,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     
     // Check events
     verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
@@ -564,15 +568,15 @@ public class TestRMAppAttemptTransitions
     submitApplicationAttempt();
     applicationAttempt.handle(
         new RMAppAttemptEvent(
-            applicationAttempt.getAppAttemptId(), 
+            applicationAttempt.getAppAttemptId(),
             RMAppAttemptEventType.ATTEMPT_ADDED));
     
     if(unmanagedAM){
       assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
           applicationAttempt.getAppAttemptState());
       applicationAttempt.handle(
-          new RMAppAttemptNewSavedEvent(
-              applicationAttempt.getAppAttemptId(), null));
+        new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
+            RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
     }
     
     testAppAttemptScheduledState();
@@ -599,6 +603,9 @@ public class TestRMAppAttemptTransitions
             any(List.class), 
             any(List.class))).
     thenReturn(allocation);
+    RMContainer rmContainer = mock(RMContainerImpl.class);
+    when(scheduler.getRMContainer(container.getId())).
+        thenReturn(rmContainer);
     
     applicationAttempt.handle(
         new RMAppAttemptContainerAllocatedEvent(
@@ -607,8 +614,8 @@ public class TestRMAppAttemptTransitions
     assertEquals(RMAppAttemptState.ALLOCATED_SAVING, 
         applicationAttempt.getAppAttemptState());
     applicationAttempt.handle(
-        new RMAppAttemptNewSavedEvent(
-            applicationAttempt.getAppAttemptId(), null));
+        new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
+            RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
     
     testAppAttemptAllocatedState(container);
     
@@ -666,8 +673,10 @@ public class TestRMAppAttemptTransitions
     runApplicationAttempt(null, "host", 8042, url, true);
 
     // complete a container
-    applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
-        applicationAttempt.getAppAttemptId(), mock(Container.class)));
+    Container container = mock(Container.class);
+    when(container.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+    application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(),
+        container.getNodeId()));
     applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
         applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
     // complete AM
@@ -685,8 +694,8 @@ public class TestRMAppAttemptTransitions
     assertEquals(RMAppAttemptState.FINAL_SAVING,
       applicationAttempt.getAppAttemptState());
     applicationAttempt.handle(
-      new RMAppAttemptUpdateSavedEvent(
-          applicationAttempt.getAppAttemptId(), null));
+      new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), 
+          RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
   }
 
   @Test
@@ -771,6 +780,32 @@ public class TestRMAppAttemptTransitions
   }
 
   @Test
+  public void testAMCrashAtScheduled() {
+    // This is to test sending CONTAINER_FINISHED event at SCHEDULED state.
+    // Verify the state transition is correct.
+    scheduleApplicationAttempt();
+    ContainerStatus cs =
+        SchedulerUtils.createAbnormalContainerStatus(
+            BuilderUtils.newContainerId(
+                applicationAttempt.getAppAttemptId(), 1),
+            SchedulerUtils.LOST_CONTAINER);
+    // send CONTAINER_FINISHED event at SCHEDULED state,
+    // The state should be FINAL_SAVING with previous state SCHEDULED
+    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+        applicationAttempt.getAppAttemptId(), cs));
+    // createApplicationAttemptState will return previous state (SCHEDULED),
+    // if the current state is FINAL_SAVING.
+    assertEquals(YarnApplicationAttemptState.SCHEDULED,
+        applicationAttempt.createApplicationAttemptState());
+    // send ATTEMPT_UPDATE_SAVED event,
+    // verify the state is changed to state FAILED.
+    sendAttemptUpdateSavedEvent(applicationAttempt);
+    assertEquals(RMAppAttemptState.FAILED,
+        applicationAttempt.getAppAttemptState());
+    verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
+  }
+
+  @Test
   public void testAllocatedToKilled() {
     Container amContainer = allocateApplicationAttempt();
     applicationAttempt.handle(
@@ -812,6 +847,9 @@ public class TestRMAppAttemptTransitions
       applicationAttempt.getAppAttemptState());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
+    boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null);
+    verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(),
+      exitCode, shouldCheckURL);
   }
   
   @Test
@@ -845,7 +883,7 @@ public class TestRMAppAttemptTransitions
         applicationAttempt.getAppAttemptState());
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
@@ -882,7 +920,7 @@ public class TestRMAppAttemptTransitions
         applicationAttempt.getAppAttemptState());
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
-    assertEquals(0, applicationAttempt.getRanNodes().size());
+    assertEquals(0, application.getRanNodes().size());
     String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
         applicationAttempt.getAppAttemptId().getApplicationId());
     assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
@@ -1229,6 +1267,20 @@ public class TestRMAppAttemptTransitions
     verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
   }
 
+  private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics,
+        int exitCode, boolean shouldCheckURL) {
+    assertTrue("Diagnostic information does not point the logs to the users",
+      diagnostics.contains("logs"));
+    assertTrue("Diagnostic information does not contain application attempt id",
+      diagnostics.contains(applicationAttempt.getAppAttemptId().toString()));
+    assertTrue("Diagnostic information does not contain application exit code",
+      diagnostics.contains("exitCode: " + exitCode));
+    if (shouldCheckURL) {
+      assertTrue("Diagnostic information does not contain application proxy URL",
+      diagnostics.contains(applicationAttempt.getWebProxyBase()));
+    }
+  }
+
   private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
     verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
     if (UserGroupInformation.isSecurityEnabled()) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -204,4 +214,36 @@ public class TestRMContainerImpl {
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
     verify(writer, never()).containerFinished(any(RMContainer.class));
   }
+  
+  @Test
+  public void testExistenceOfResourceRequestInRMContainer() throws Exception {
+    Configuration conf = new Configuration();
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    ResourceScheduler scheduler = rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // Verify whether list of ResourceRequest is present in RMContainer
+    // while moving to ALLOCATED state
+    Assert.assertNotNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+
+    // Allocate container
+    am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
+        .getAllocatedContainers();
+    rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
+
+    // After RMContainer moving to ACQUIRED state, list of ResourceRequest will
+    // be empty
+    Assert.assertNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
@@ -61,10 +62,15 @@ public class TestSchedulerApplicationAtt
     QueueMetrics newMetrics = newQueue.getMetrics();
 
     ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getEpoch()).thenReturn(3);
     SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
-        user, oldQueue, oldQueue.getActiveUsersManager(), null);
+        user, oldQueue, oldQueue.getActiveUsersManager(), rmContext);
     oldMetrics.submitApp(user);
     
+    // confirm that containerId is calculated based on epoch.
+    assertEquals(app.getNewContainerId(), 0x00c00001);
+    
     // Resource request
     Resource requestedResource = Resource.newInstance(1536, 2);
     Priority requestedPriority = Priority.newInstance(2);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Tue Aug 19 23:49:39 2014
@@ -384,15 +384,18 @@ public class TestSchedulerUtils {
     Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
   }
 
-  public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler(
-      final Map<ApplicationId, SchedulerApplication> applications,
-      EventHandler<SchedulerEvent> handler, String queueName) throws Exception {
+  public static SchedulerApplication<SchedulerApplicationAttempt>
+      verifyAppAddedAndRemovedFromScheduler(
+          Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
+          EventHandler<SchedulerEvent> handler, String queueName)
+          throws Exception {
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     AppAddedSchedulerEvent appAddedEvent =
         new AppAddedSchedulerEvent(appId, queueName, "user");
     handler.handle(appAddedEvent);
-    SchedulerApplication app = applications.get(appId);
+    SchedulerApplication<SchedulerApplicationAttempt> app =
+        applications.get(appId);
     // verify application is added.
     Assert.assertNotNull(app);
     Assert.assertEquals("user", app.getUser());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Tue Aug 19 23:49:39 2014
@@ -81,7 +81,7 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB, 32));
-    when(csContext.getClusterResources()).
+    when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
     when(csContext.getApplicationComparator()).
         thenReturn(CapacityScheduler.applicationComparator);
@@ -165,7 +165,7 @@ public class TestApplicationLimits {
     
     // Say cluster has 100 nodes of 16G each
     Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
-    when(csContext.getClusterResources()).thenReturn(clusterResource);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CSQueue root = 
@@ -478,7 +478,7 @@ public class TestApplicationLimits {
     
     // Say cluster has 100 nodes of 16G each
     Resource clusterResource = Resources.createResource(100 * 16 * GB);
-    when(csContext.getClusterResources()).thenReturn(clusterResource);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
     
     Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
     CapacityScheduler.parseQueue(csContext, csConf, null, "root", 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Tue Aug 19 23:49:39 2014
@@ -25,22 +25,37 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
-
-import org.junit.Assert;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -48,18 +63,36 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -69,9 +102,14 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -119,13 +157,16 @@ public class TestCapacityScheduler {
 
   @After
   public void tearDown() throws Exception {
-    resourceManager.stop();
+    if (resourceManager != null) {
+      resourceManager.stop();
+    }
   }
 
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
     ResourceScheduler scheduler = new CapacityScheduler();
+    scheduler.setRMContext(resourceManager.getRMContext());
     Configuration conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
@@ -340,18 +381,23 @@ public class TestCapacityScheduler {
   public void testRefreshQueues() throws Exception {
     CapacityScheduler cs = new CapacityScheduler();
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
     setupQueueConfiguration(conf);
     cs.setConf(new YarnConfiguration());
-    cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
-      null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
     checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
 
     conf.setCapacity(A, 80f);
     conf.setCapacity(B, 20f);
     cs.reinitialize(conf, mockContext);
     checkQueueCapacities(cs, 80f, 20f);
+    cs.stop();
   }
 
   private void checkQueueCapacities(CapacityScheduler cs,
@@ -454,6 +500,9 @@ public class TestCapacityScheduler {
     setupQueueConfiguration(csConf);
     CapacityScheduler cs = new CapacityScheduler();
     cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(csConf);
+    cs.start();
     cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
       null, null, new RMContainerTokenSecretManager(csConf),
       new NMTokenSecretManagerInRM(csConf),
@@ -465,14 +514,15 @@ public class TestCapacityScheduler {
     cs.handle(new NodeAddedSchedulerEvent(n1));
     cs.handle(new NodeAddedSchedulerEvent(n2));
 
-    Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory());
+    Assert.assertEquals(6 * GB, cs.getClusterResource().getMemory());
 
     // reconnect n1 with downgraded memory
     n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
     cs.handle(new NodeRemovedSchedulerEvent(n1));
     cs.handle(new NodeAddedSchedulerEvent(n1));
 
-    Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
+    Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory());
+    cs.stop();
   }
 
   @Test
@@ -481,6 +531,9 @@ public class TestCapacityScheduler {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(conf);
     cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
     cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
       null, new RMContainerTokenSecretManager(conf),
       new NMTokenSecretManagerInRM(conf),
@@ -511,6 +564,7 @@ public class TestCapacityScheduler {
       assertEquals(queueB, queueB4.getParent());
     } finally {
       B3_CAPACITY += B4_CAPACITY;
+      cs.stop();
     }
   }
   @Test
@@ -627,17 +681,17 @@ public class TestCapacityScheduler {
 
   @Test
   public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
-
-    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
-    CapacityScheduler cs = new CapacityScheduler();
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(conf);
-    cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
-      null, null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM(), null));
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    @SuppressWarnings("unchecked")
+    AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs =
+        (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
+          .getResourceScheduler();
 
-    SchedulerApplication app =
+    SchedulerApplication<SchedulerApplicationAttempt> app =
         TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
           cs.getSchedulerApplications(), cs, "a1");
     Assert.assertEquals("a1", app.getQueue().getQueueName());
@@ -667,5 +721,1173 @@ public class TestCapacityScheduler {
       CapacityScheduler.schedule(cs);
     }
   }
+  
+  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
+
+  private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
+      int numAMPreempted, int numTaskPreempted,
+      Resource currentAttemptPreempted, boolean currentAttemptAMPreempted,
+      int numLatestAttemptTaskPreempted) throws InterruptedException {
+    while (true) {
+      RMAppMetrics appPM = app.getRMAppMetrics();
+      RMAppAttemptMetrics attemptPM =
+          app.getCurrentAppAttempt().getRMAppAttemptMetrics();
+
+      if (appPM.getResourcePreempted().equals(preempted)
+          && appPM.getNumAMContainersPreempted() == numAMPreempted
+          && appPM.getNumNonAMContainersPreempted() == numTaskPreempted
+          && attemptPM.getResourcePreempted().equals(currentAttemptPreempted)
+          && app.getCurrentAppAttempt().getRMAppAttemptMetrics()
+            .getIsPreempted() == currentAttemptAMPreempted
+          && attemptPM.getNumNonAMContainersPreempted() == 
+             numLatestAttemptTaskPreempted) {
+        return;
+      }
+      Thread.sleep(500);
+    }
+  }
+
+  private void waitForNewAttemptCreated(RMApp app,
+      ApplicationAttemptId previousAttemptId) throws InterruptedException {
+    while (app.getCurrentAppAttempt().equals(previousAttemptId)) {
+      Thread.sleep(500);
+    }
+  }
+  
+  @Test(timeout = 30000)
+  public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
+    final YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MyContainerManager containerManager = new MyContainerManager();
+    final MockRMWithAMS rm =
+        new MockRMWithAMS(conf, containerManager);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("localhost:1234", 5120);
+
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>(2);
+    acls.put(ApplicationAccessType.VIEW_APP, "*");
+    RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
+
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
+    int msecToWait = 10000;
+    int msecToSleep = 100;
+    while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
+        && msecToWait > 0) {
+      LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
+          + "Current state is " + attempt.getAppAttemptState());
+      Thread.sleep(msecToSleep);
+      msecToWait -= msecToSleep;
+    }
+    Assert.assertEquals(attempt.getAppAttemptState(),
+        RMAppAttemptState.LAUNCHED);
+
+    // Create a client to the RM.
+    final YarnRPC rpc = YarnRPC.create(conf);
+
+    UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
+    Credentials credentials = containerManager.getContainerCredentials();
+    final InetSocketAddress rmBindAddress =
+        rm.getApplicationMasterService().getBindAddress();
+    Token<? extends TokenIdentifier> amRMToken =
+        MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+          credentials.getAllTokens());
+    currentUser.addToken(amRMToken);
+    ApplicationMasterProtocol client =
+        currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() {
+            return (ApplicationMasterProtocol) rpc.getProxy(
+              ApplicationMasterProtocol.class, rmBindAddress, conf);
+          }
+        });
+
+    RegisterApplicationMasterRequest request =
+        RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
+    client.registerApplicationMaster(request);
+
+    // grab the scheduler lock from another thread
+    // and verify an allocate call in this thread doesn't block on it
+    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    Thread otherThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        synchronized(cs) {
+          try {
+            barrier.await();
+            barrier.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } catch (BrokenBarrierException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    });
+    otherThread.start();
+    barrier.await();
+    AllocateRequest allocateRequest =
+        AllocateRequest.newInstance(0, 0.0f, null, null, null);
+    client.allocate(allocateRequest);
+    barrier.await();
+    otherThread.join();
+
+    rm.stop();
+  }
+
+  @Test
+  public void testNumClusterNodes() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(conf);
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    cs.setRMContext(rmContext);
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    cs.init(csConf);
+    cs.start();
+    assertEquals(0, cs.getNumClusterNodes());
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+    assertEquals(2, cs.getNumClusterNodes());
+
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(1, cs.getNumClusterNodes());
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    assertEquals(2, cs.getNumClusterNodes());
+    cs.handle(new NodeRemovedSchedulerEvent(n2));
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(0, cs.getNumClusterNodes());
+
+    cs.stop();
+  }
+
+  @Test(timeout = 120000)
+  public void testPreemptionInfo() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    int CONTAINER_MEMORY = 1024; // start RM
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+
+    // get scheduler
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // start NM
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+
+    // get scheduler app
+    FiCaSchedulerApp schedulerAppAttempt =
+        cs.getSchedulerApplications().get(app0.getApplicationId())
+            .getCurrentAppAttempt();
+
+    // allocate some containers and launch them
+    List<Container> allocatedContainers =
+        am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
+
+    // kill the 3 containers
+    for (Container c : allocatedContainers) {
+      cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+    }
+
+    // check values
+    waitForAppPreemptionInfo(app0,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
+
+    // kill app0-attempt0 AM container
+    cs.killContainer(schedulerAppAttempt.getRMContainer(app0
+        .getCurrentAppAttempt().getMasterContainer().getId()));
+
+    // wait for app0 failed
+    waitForNewAttemptCreated(app0, am0.getApplicationAttemptId());
+
+    // check values
+    waitForAppPreemptionInfo(app0,
+        Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3,
+        Resource.newInstance(0, 0), false, 0);
+
+    // launch app0-attempt1
+    MockAM am1 = launchAM(app0, rm1, nm1);
+    schedulerAppAttempt =
+        cs.getSchedulerApplications().get(app0.getApplicationId())
+            .getCurrentAppAttempt();
+
+    // allocate some containers and launch them
+    allocatedContainers =
+        am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
+    for (Container c : allocatedContainers) {
+      cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+    }
+
+    // check values
+    waitForAppPreemptionInfo(app0,
+        Resource.newInstance(CONTAINER_MEMORY * 7, 7), 1, 6,
+        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
+
+    rm1.stop();
+  }
+  
+  @Test(timeout = 30000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId1 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
+
+    RMContainer rmContainer = cs.getRMContainer(containerId1);
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(am1
+        .getApplicationAttemptId());
+
+    FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
+    for (ResourceRequest request : requests) {
+      // Skip the OffRack and RackLocal resource requests.
+      if (request.getResourceName().equals(node.getRackName())
+          || request.getResourceName().equals(ResourceRequest.ANY)) {
+        continue;
+      }
+
+      // Already the node local resource request is cleared from RM after
+      // allocation.
+      Assert.assertNull(app.getResourceRequest(request.getPriority(),
+          request.getResourceName()));
+    }
+
+    // Call killContainer to preempt the container
+    cs.killContainer(rmContainer);
+
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      // Resource request must have added back in RM after preempt event
+      // handling.
+      Assert.assertEquals(
+          1,
+          app.getResourceRequest(request.getPriority(),
+              request.getResourceName()).getNumContainers());
+    }
+
+    // New container will be allocated and will move to ALLOCATED state
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 3);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // allocate container
+    List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
+
+  private MockRM setUpMove() {
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    return rm;
+  }
+
+  @Test
+  public void testMoveAppBasic() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    scheduler.moveApplication(app.getApplicationId(), "b1");
+
+    // check postconditions
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(1, appsInB1.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("b1"));
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.contains(appAttemptId));
+    assertEquals(1, appsInB.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppSameParent() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2");
+    assertTrue(appsInA2.isEmpty());
+
+    // now move the app
+    scheduler.moveApplication(app.getApplicationId(), "a2");
+
+    // check postconditions
+    appsInA2 = scheduler.getAppsInQueue("a2");
+    assertEquals(1, appsInA2.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a2"));
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(4 * GB, 1));
+
+    // Register node2
+    String host_1 = "host_1";
+    NodeManager nm_1 =
+        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(2 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+    application_0.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_0_0 = Resources.createResource(1 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_0);
+
+    // Submit application_1
+    Application application_1 =
+        new Application("user_1", "b2", resourceManager);
+    application_1.submit(); // app + app attempt event sent to scheduler
+
+    application_1.addNodeManager(host_0, 1234, nm_0);
+    application_1.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
+    application_1.addResourceRequestSpec(priority_1, capability_1_0);
+
+    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
+    application_1.addResourceRequestSpec(priority_0, capability_1_1);
+
+    Task task_1_0 =
+        new Task(application_1, priority_1, new String[] { host_0, host_1 });
+    application_1.addTask(task_1_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+    application_1.schedule(); // allocate
+
+    // task_0_0 task_1_0 allocated, used=2G
+    nodeUpdate(nm_0);
+
+    // nothing allocated
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(1 * GB, application_0);
+
+    application_1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application_1);
+
+    checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G
+                                          // available
+    checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available
+
+    // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5%
+    // total cap)
+    scheduler.moveApplication(application_0.getApplicationId(), "b1");
+
+    // 2GB 1C
+    Task task_1_1 =
+        new Task(application_1, priority_0,
+            new String[] { ResourceRequest.ANY });
+    application_1.addTask(task_1_1);
+
+    application_1.schedule();
+
+    // 2GB 1C
+    Task task_0_1 =
+        new Task(application_0, priority_0, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_1);
+
+    application_0.schedule();
+
+    // prev 2G used free 2G
+    nodeUpdate(nm_0);
+
+    // prev 0G used free 2G
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_1.schedule();
+    checkApplicationResourceUsage(3 * GB, application_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule();
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    checkNodeResourceUsage(4 * GB, nm_0);
+    checkNodeResourceUsage(2 * GB, nm_1);
+
+  }
+
+  @Test
+  public void testMoveAppSuccess() throws Exception {
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // Register node2
+    String host_1 = "host_1";
+    NodeManager nm_1 =
+        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+    application_0.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_0);
+
+    // Submit application_1
+    Application application_1 =
+        new Application("user_1", "b2", resourceManager);
+    application_1.submit(); // app + app attempt event sent to scheduler
+
+    application_1.addNodeManager(host_0, 1234, nm_0);
+    application_1.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
+    application_1.addResourceRequestSpec(priority_1, capability_1_0);
+
+    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
+    application_1.addResourceRequestSpec(priority_0, capability_1_1);
+
+    Task task_1_0 =
+        new Task(application_1, priority_1, new String[] { host_0, host_1 });
+    application_1.addTask(task_1_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+    application_1.schedule(); // allocate
+
+    // b2 can only run 1 app at a time
+    scheduler.moveApplication(application_0.getApplicationId(), "b2");
+
+    nodeUpdate(nm_0);
+
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(0 * GB, application_0);
+
+    application_1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application_1);
+
+    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+    // not scheduled
+    checkNodeResourceUsage(1 * GB, nm_0);
+    checkNodeResourceUsage(0 * GB, nm_1);
+
+    // lets move application_0 to a queue where it can run
+    scheduler.moveApplication(application_0.getApplicationId(), "a2");
+    application_0.schedule();
+
+    nodeUpdate(nm_1);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    checkNodeResourceUsage(1 * GB, nm_0);
+    checkNodeResourceUsage(3 * GB, nm_1);
+
+  }
+
+  @Test(expected = YarnException.class)
+  public void testMoveAppViolateQueueState() throws Exception {
+
+    resourceManager = new ResourceManager();
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    StringBuilder qState = new StringBuilder();
+    qState.append(CapacitySchedulerConfiguration.PREFIX).append(B)
+        .append(CapacitySchedulerConfiguration.DOT)
+        .append(CapacitySchedulerConfiguration.STATE);
+    csConf.set(qState.toString(), QueueState.STOPPED.name());
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    resourceManager.init(conf);
+    resourceManager.getRMContext().getContainerTokenSecretManager()
+        .rollMasterKey();
+    resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
+    ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
+    mockContext = mock(RMContext.class);
+    when(mockContext.getConfigurationProvider()).thenReturn(
+        new LocalConfigurationProvider());
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(6 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+
+    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0 });
+    application_0.addTask(task_0_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+
+    // task_0_0 allocated
+    nodeUpdate(nm_0);
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    checkNodeResourceUsage(3 * GB, nm_0);
+    // b2 queue contains 3GB consumption app,
+    // add another 3GB will hit max capacity limit on queue b
+    scheduler.moveApplication(application_0.getApplicationId(), "b1");
+
+  }
+
+  @Test
+  public void testMoveAppQueueMetricsCheck() throws Exception {
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    // Register node1
+    String host_0 = "host_0";
+    NodeManager nm_0 =
+        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // Register node2
+    String host_1 = "host_1";
+    NodeManager nm_1 =
+        registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(5 * GB, 1));
+
+    // ResourceRequest priorities
+    Priority priority_0 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(0);
+    Priority priority_1 =
+        org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
+            .create(1);
+
+    // Submit application_0
+    Application application_0 =
+        new Application("user_0", "a1", resourceManager);
+    application_0.submit(); // app + app attempt event sent to scheduler
+
+    application_0.addNodeManager(host_0, 1234, nm_0);
+    application_0.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_0_0 = Resources.createResource(3 * GB, 1);
+    application_0.addResourceRequestSpec(priority_1, capability_0_0);
+
+    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
+    application_0.addResourceRequestSpec(priority_0, capability_0_1);
+
+    Task task_0_0 =
+        new Task(application_0, priority_1, new String[] { host_0, host_1 });
+    application_0.addTask(task_0_0);
+
+    // Submit application_1
+    Application application_1 =
+        new Application("user_1", "b2", resourceManager);
+    application_1.submit(); // app + app attempt event sent to scheduler
+
+    application_1.addNodeManager(host_0, 1234, nm_0);
+    application_1.addNodeManager(host_1, 1234, nm_1);
+
+    Resource capability_1_0 = Resources.createResource(1 * GB, 1);
+    application_1.addResourceRequestSpec(priority_1, capability_1_0);
+
+    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
+    application_1.addResourceRequestSpec(priority_0, capability_1_1);
+
+    Task task_1_0 =
+        new Task(application_1, priority_1, new String[] { host_0, host_1 });
+    application_1.addTask(task_1_0);
+
+    // Send resource requests to the scheduler
+    application_0.schedule(); // allocate
+    application_1.schedule(); // allocate
+
+    nodeUpdate(nm_0);
+
+    nodeUpdate(nm_1);
+
+    CapacityScheduler cs =
+        (CapacityScheduler) resourceManager.getResourceScheduler();
+    CSQueue origRootQ = cs.getRootQueue();
+    CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ);
+    int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
+    int origNumAppsRoot = origRootQ.getNumApplications();
+
+    scheduler.moveApplication(application_0.getApplicationId(), "a2");
+
+    CSQueue newRootQ = cs.getRootQueue();
+    int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
+    int newNumAppsRoot = newRootQ.getNumApplications();
+    CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ);
+    CapacitySchedulerLeafQueueInfo origOldA1 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo origNewA1 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo targetOldA2 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues());
+    CapacitySchedulerLeafQueueInfo targetNewA2 =
+        (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues());
+    // originally submitted here
+    assertEquals(1, origOldA1.getNumApplications());
+    assertEquals(1, origNumAppsA);
+    assertEquals(2, origNumAppsRoot);
+    // after the move
+    assertEquals(0, origNewA1.getNumApplications());
+    assertEquals(1, newNumAppsA);
+    assertEquals(2, newNumAppsRoot);
+    // original consumption on a1
+    assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory());
+    assertEquals(1, origOldA1.getResourcesUsed().getvCores());
+    assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move
+    assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move
+    // app moved here with live containers
+    assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory());
+    assertEquals(1, targetNewA2.getResourcesUsed().getvCores());
+    // it was empty before the move
+    assertEquals(0, targetOldA2.getNumApplications());
+    assertEquals(0, targetOldA2.getResourcesUsed().getMemory());
+    assertEquals(0, targetOldA2.getResourcesUsed().getvCores());
+    // after the app moved here
+    assertEquals(1, targetNewA2.getNumApplications());
+    // 1 container on original queue before move
+    assertEquals(1, origOldA1.getNumContainers());
+    // after the move the resource released
+    assertEquals(0, origNewA1.getNumContainers());
+    // and moved to the new queue
+    assertEquals(1, targetNewA2.getNumContainers());
+    // which originally didn't have any
+    assertEquals(0, targetOldA2.getNumContainers());
+    // 1 user with 3GB
+    assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getMemory());
+    // 1 user with 1 core
+    assertEquals(1, origOldA1.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getvCores());
+    // user ha no more running app in the orig queue
+    assertEquals(0, origNewA1.getUsers().getUsersList().size());
+    // 1 user with 3GB
+    assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getMemory());
+    // 1 user with 1 core
+    assertEquals(1, targetNewA2.getUsers().getUsersList().get(0)
+        .getResourcesUsed().getvCores());
+
+    // Get allocations from the scheduler
+    application_0.schedule(); // task_0_0
+    checkApplicationResourceUsage(3 * GB, application_0);
+
+    application_1.schedule(); // task_1_0
+    checkApplicationResourceUsage(1 * GB, application_1);
+
+    // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is
+    // not scheduled
+    checkNodeResourceUsage(4 * GB, nm_0);
+    checkNodeResourceUsage(0 * GB, nm_1);
+
+  }
+
+  private int getNumAppsInQueue(String name, List<CSQueue> queues) {
+    for (CSQueue queue : queues) {
+      if (queue.getQueueName().equals(name)) {
+        return queue.getNumApplications();
+      }
+    }
+    return -1;
+  }
+
+  private CapacitySchedulerQueueInfo getQueueInfo(String name,
+      CapacitySchedulerQueueInfoList info) {
+    if (info != null) {
+      for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) {
+        if (queueInfo.getQueueName().equals(name)) {
+          return queueInfo;
+        } else {
+          CapacitySchedulerQueueInfo result =
+              getQueueInfo(name, queueInfo.getQueues());
+          if (result == null) {
+            continue;
+          }
+          return result;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testMoveAllApps() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    scheduler.moveAllApps("a1", "b1");
+
+    // check postconditions
+    Thread.sleep(1000);
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(1, appsInB1.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("b1"));
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.contains(appAttemptId));
+    assertEquals(1, appsInB.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAllAppsInvalidDestination() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    try {
+      scheduler.moveAllApps("a1", "DOES_NOT_EXIST");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check postconditions, app should still be in a1
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testMoveAllAppsInvalidSource() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    // now move the app
+    try {
+      scheduler.moveAllApps("DOES_NOT_EXIST", "b1");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check postconditions, app should still be in a1
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testKillAllAppsInQueue() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertTrue(queue.equals("a1"));
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // now kill the app
+    scheduler.killAllAppsInQueue("a1");
+
+    // check postconditions
+    rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.isEmpty());
+
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertTrue(appsInA1.isEmpty());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.isEmpty());
+
+    rm.stop();
+  }
+
+  @Test
+  public void testKillAllAppsInvalidSource() throws Exception {
+    MockRM rm = setUpMove();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    // now kill the app
+    try {
+      scheduler.killAllAppsInQueue("DOES_NOT_EXIST");
+      Assert.fail();
+    } catch (YarnException e) {
+      // expected
+    }
+
+    // check postconditions, app should still be in a1
+    appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(1, appsInA.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(1, appsInRoot.size());
+
+    rm.stop();
+  }
 
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java Tue Aug 19 23:49:39 2014
@@ -89,7 +89,7 @@ public class TestChildQueueOrder {
         Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(16*GB, 32));
-    when(csContext.getClusterResources()).
+    when(csContext.getClusterResource()).
     thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
     when(csContext.getApplicationComparator()).
     thenReturn(CapacityScheduler.applicationComparator);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java Tue Aug 19 23:49:39 2014
@@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.re
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +29,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -209,10 +209,11 @@ public class TestContainerAllocation {
 
         @Override
         public Token createContainerToken(ContainerId containerId,
-            NodeId nodeId, String appSubmitter, Resource capability) {
+            NodeId nodeId, String appSubmitter, Resource capability,
+            Priority priority, long createTime) {
           numRetries++;
           return super.createContainerToken(containerId, nodeId, appSubmitter,
-            capability);
+            capability, priority, createTime);
         }
       };
     }