You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/02/07 21:33:02 UTC

svn commit: r1565792 [4/5] - in /hadoop/common/branches/branch-2.3/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/record...

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Fri Feb  7 20:33:01 2014
@@ -18,31 +18,49 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -50,186 +68,238 @@ import org.junit.Test;
  *
  */
 public class TestAMRestart {
-
-  @Test
-  public void testAMRestartWithExistingContainers() throws Exception {
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-
-    MockRM rm1 = new MockRM(conf);
-    rm1.start();
-    RMApp app1 =
-        rm1.submitApp(200, "name", "user",
-          new HashMap<ApplicationAccessType, String>(), false, "default", -1,
-          null, "MAPREDUCE", false, true);
-    MockNM nm1 =
-        new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
-    nm1.registerNode();
-    MockNM nm2 =
-        new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService());
-    nm2.registerNode();
-
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    int NUM_CONTAINERS = 3;
-    // allocate NUM_CONTAINERS containers
-    am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
-      new ArrayList<ContainerId>());
-    nm1.nodeHeartbeat(true);
-
-    // wait for containers to be allocated.
-    List<Container> containers =
-        am1.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers();
-    while (containers.size() != NUM_CONTAINERS) {
-      nm1.nodeHeartbeat(true);
-      containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
-        new ArrayList<ContainerId>()).getAllocatedContainers());
-      Thread.sleep(200);
-    }
-
-    // launch the 2nd container, for testing running container transferred.
-    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
-    ContainerId containerId2 =
-        ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
-    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-
-    // launch the 3rd container, for testing container allocated by previous
-    // attempt is completed by the next new attempt/
-    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
-    ContainerId containerId3 =
-        ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
-    rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
-
-    // 4th container still in AQUIRED state. for testing Acquired container is
-    // always killed.
-    ContainerId containerId4 =
-        ContainerId.newInstance(am1.getApplicationAttemptId(), 4);
-    rm1.waitForState(nm1, containerId4, RMContainerState.ACQUIRED);
-
-    // 5th container is in Allocated state. for testing allocated container is
-    // always killed.
-    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
-    nm1.nodeHeartbeat(true);
-    ContainerId containerId5 =
-        ContainerId.newInstance(am1.getApplicationAttemptId(), 5);
-    rm1.waitForContainerAllocated(nm1, containerId5);
-    rm1.waitForState(nm1, containerId5, RMContainerState.ALLOCATED);
-
-    // 6th container is in Reserved state.
-    am1.allocate("127.0.0.1", 6000, 1, new ArrayList<ContainerId>());
-    ContainerId containerId6 =
-        ContainerId.newInstance(am1.getApplicationAttemptId(), 6);
-    nm1.nodeHeartbeat(true);
-    SchedulerApplicationAttempt schedulerAttempt =
-        ((CapacityScheduler) rm1.getResourceScheduler())
-          .getCurrentAttemptForContainer(containerId6);
-    while (schedulerAttempt.getReservedContainers().size() == 0) {
-      System.out.println("Waiting for container " + containerId6
-          + " to be reserved.");
-      nm1.nodeHeartbeat(true);
-      Thread.sleep(200);
-    }
-    // assert containerId6 is reserved.
-    Assert.assertEquals(containerId6, schedulerAttempt.getReservedContainers()
-      .get(0).getContainerId());
-
-    // fail the AM by sending CONTAINER_FINISHED event without registering.
-    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
-    am1.waitForState(RMAppAttemptState.FAILED);
-
-    // wait for some time. previous AM's running containers should still remain
-    // in scheduler even though am failed
-    Thread.sleep(3000);
-    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-    // acquired/allocated containers are cleaned up.
-    Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId4));
-    Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId5));
-
-    // wait for app to start a new attempt.
-    rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
-    // assert this is a new AM.
-    ApplicationAttemptId newAttemptId =
-        app1.getCurrentAppAttempt().getAppAttemptId();
-    Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
-
-    // launch the new AM
-    RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
-    nm1.nodeHeartbeat(true);
-    MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
-    RegisterApplicationMasterResponse registerResponse =
-        am2.registerAppAttempt();
-
-    // Assert two containers are running: container2 and container3;
-    Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt()
-      .size());
-    boolean containerId2Exists = false, containerId3Exists = false;
-    for (Container container : registerResponse
-      .getContainersFromPreviousAttempt()) {
-      if (container.getId().equals(containerId2)) {
-        containerId2Exists = true;
-      }
-      if (container.getId().equals(containerId3)) {
-        containerId3Exists = true;
-      }
-    }
-    Assert.assertTrue(containerId2Exists && containerId3Exists);
-    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
-
-    // complete container by sending the container complete event which has earlier
-    // attempt's attemptId
-    nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE);
-    rm1.waitForState(nm1, containerId3, RMContainerState.COMPLETED);
-
-    // Even though the completed container containerId3 event was sent to the
-    // earlier failed attempt, new RMAppAttempt can also capture this container
-    // info.
-    // completed containerId4 is also transferred to the new attempt.
-    RMAppAttempt newAttempt =
-        app1.getRMAppAttempt(am2.getApplicationAttemptId());
-    // 4 containers finished, acquired/allocated/reserved/completed.
-    Assert.assertEquals(4, newAttempt.getJustFinishedContainers().size());
-    boolean container3Exists = false, container4Exists = false, container5Exists =
-        false, container6Exists = false;
-    for(ContainerStatus status :  newAttempt.getJustFinishedContainers()) {
-      if(status.getContainerId().equals(containerId3)) {
-        // containerId3 is the container ran by previous attempt but finished by the
-        // new attempt.
-        container3Exists = true;
-      }
-      if (status.getContainerId().equals(containerId4)) {
-        // containerId4 is the Acquired Container killed by the previous attempt,
-        // it's now inside new attempt's finished container list.
-        container4Exists = true;
-      }
-      if (status.getContainerId().equals(containerId5)) {
-        // containerId5 is the Allocated container killed by previous failed attempt.
-        container5Exists = true;
-      }
-      if (status.getContainerId().equals(containerId6)) {
-        // containerId6 is the reserved container killed by previous failed attempt.
-        container6Exists = true;
-      }
-    }
-    Assert.assertTrue(container3Exists && container4Exists && container5Exists
-        && container6Exists);
-
-    // New SchedulerApplicationAttempt also has the containers info.
-    rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-
-    // record the scheduler attempt for testing.
-    SchedulerApplicationAttempt schedulerNewAttempt =
-        ((CapacityScheduler) rm1.getResourceScheduler())
-          .getCurrentAttemptForContainer(containerId2);
-    // finish this application
-    MockRM.finishApplicationMaster(app1, rm1, nm1, am2);
-
-    // the 2nd attempt released the 1st attempt's running container, when the
-    // 2nd attempt finishes.
-    Assert.assertFalse(schedulerNewAttempt.getLiveContainers().contains(
-      containerId2));
-    // all 4 normal containers finished.
-    Assert.assertEquals(5, newAttempt.getJustFinishedContainers().size());
-
-    rm1.stop();
-  }
+//  private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
+//  ApplicationsManagerImpl appImpl;
+//  RMContext asmContext = new RMContextImpl(new MemStore());
+//  ApplicationTokenSecretManager appTokenSecretManager =
+//    new ApplicationTokenSecretManager();
+//  DummyResourceScheduler scheduler;
+//  private ClientRMService clientRMService;
+//  int count = 0;
+//  ApplicationId appID;
+//  final int maxFailures = 3;
+//  AtomicInteger launchNotify = new AtomicInteger();
+//  AtomicInteger schedulerNotify = new AtomicInteger();
+//  volatile boolean stop = false;
+//  int schedulerAddApplication = 0;
+//  int schedulerRemoveApplication = 0;
+//  int launcherLaunchCalled = 0;
+//  int launcherCleanupCalled = 0;
+//  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+//
+//  private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
+//    public ExtApplicationsManagerImpl(
+//        ApplicationTokenSecretManager applicationTokenSecretManager,
+//        YarnScheduler scheduler, RMContext asmContext) {
+//      super(applicationTokenSecretManager, scheduler, asmContext);
+//    }
+//
+//    @Override
+//    public EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
+//        ApplicationTokenSecretManager tokenSecretManager) {
+//      return new DummyAMLauncher();
+//    }
+//  }
+//
+//  private class DummyAMLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> {
+//
+//    public DummyAMLauncher() {
+//      asmContext.getDispatcher().register(AMLauncherEventType.class, this);
+//      new Thread() {
+//        public void run() {
+//          while (!stop) {
+//            LOG.info("DEBUG -- waiting for launch");
+//            synchronized(launchNotify) {
+//              while (launchNotify.get() == 0) {
+//                try {
+//                  launchNotify.wait();
+//                } catch (InterruptedException e) {
+//                }
+//              }
+//              asmContext.getDispatcher().getEventHandler().handle(
+//                  new ApplicationEvent(
+//                      ApplicationEventType.LAUNCHED, appID));
+//              launchNotify.addAndGet(-1);
+//            }
+//          }
+//        }
+//      }.start();
+//    }
+//
+//    @Override
+//    public void handle(ASMEvent<AMLauncherEventType> event) {
+//      switch (event.getType()) {
+//      case CLEANUP:
+//        launcherCleanupCalled++;
+//        break;
+//      case LAUNCH:
+//        LOG.info("DEBUG -- launching");
+//        launcherLaunchCalled++;
+//        synchronized (launchNotify) {
+//          launchNotify.addAndGet(1);
+//          launchNotify.notify();
+//        }
+//        break;
+//      default:
+//        break;
+//      }
+//    }
+//  }
+//
+//  private class DummyResourceScheduler implements ResourceScheduler {
+//
+//    @Override
+//    public void removeNode(RMNode node) {
+//    }
+//
+//    @Override
+//    public Allocation allocate(ApplicationId applicationId,
+//        List<ResourceRequest> ask, List<Container> release) throws IOException {
+//      Container container = recordFactory.newRecordInstance(Container.class);
+//      container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
+//      container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
+//      container.setContainerManagerAddress("localhost");
+//      container.setNodeHttpAddress("localhost:8042");
+//      container.setId(recordFactory.newRecordInstance(ContainerId.class));
+//      container.getId().setAppId(appID);
+//      container.getId().setId(count);
+//      count++;
+//      return new Allocation(Arrays.asList(container), Resources.none());
+//    }
+//
+//    @Override
+//    public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+//      switch (event.getType()) {
+//      case ADD:
+//        schedulerAddApplication++;
+//        break;
+//      case EXPIRE:
+//        schedulerRemoveApplication++;
+//        LOG.info("REMOVING app : " + schedulerRemoveApplication);
+//        if (schedulerRemoveApplication == maxFailures) {
+//          synchronized (schedulerNotify) {
+//            schedulerNotify.addAndGet(1);
+//            schedulerNotify.notify();
+//          }
+//        }
+//        break;
+//      default:
+//        break;
+//      }
+//    }
+//
+//    @Override
+//    public QueueInfo getQueueInfo(String queueName,
+//        boolean includeChildQueues,
+//        boolean recursive) throws IOException {
+//      return null;
+//    }
+//    @Override
+//    public List<QueueUserACLInfo> getQueueUserAclInfo() {
+//      return null;
+//    }
+//    @Override
+//    public void addApplication(ApplicationId applicationId,
+//        ApplicationMaster master, String user, String queue, Priority priority,
+//        ApplicationStore store)
+//        throws IOException {
+//    }
+//    @Override
+//    public void addNode(RMNode nodeInfo) {
+//    }
+//    @Override
+//    public void recover(RMState state) throws Exception {
+//    }
+//    @Override
+//    public void reinitialize(Configuration conf,
+//        ContainerTokenSecretManager secretManager, RMContext rmContext)
+//        throws IOException {
+//    }
+//
+//    @Override
+//    public void nodeUpdate(RMNode nodeInfo,
+//        Map<String, List<Container>> containers) {
+//    }
+//
+//    @Override
+//    public Resource getMaximumResourceCapability() {
+//      // TODO Auto-generated method stub
+//      return null;
+//    }
+//
+//    @Override
+//    public Resource getMinimumResourceCapability() {
+//      // TODO Auto-generated method stub
+//      return null;
+//    }
+//  }
+//
+//  @Before
+//  public void setUp() {
+//
+//    asmContext.getDispatcher().register(ApplicationEventType.class,
+//        new ResourceManager.ApplicationEventDispatcher(asmContext));
+//
+//    appID = recordFactory.newRecordInstance(ApplicationId.class);
+//    appID.setClusterTimestamp(System.currentTimeMillis());
+//    appID.setId(1);
+//    Configuration conf = new Configuration();
+//    scheduler = new DummyResourceScheduler();
+//    asmContext.getDispatcher().init(conf);
+//    asmContext.getDispatcher().start();
+//    asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
+//    appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
+//
+//    conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
+//    conf.setInt(RMConfig.AM_MAX_RETRIES, maxFailures);
+//    appImpl.init(conf);
+//    appImpl.start();
+//
+//    this.clientRMService = new ClientRMService(asmContext, appImpl
+//        .getAmLivelinessMonitor(), appImpl.getClientToAMSecretManager(),
+//        scheduler);
+//    this.clientRMService.init(conf);
+//  }
+//
+//  @After
+//  public void tearDown() {
+//  }
+//
+//  private void waitForFailed(AppAttempt application, ApplicationState
+//      finalState) throws Exception {
+//    int count = 0;
+//    while(application.getState() != finalState && count < 10) {
+//      Thread.sleep(500);
+//      count++;
+//    }
+//    Assert.assertEquals(finalState, application.getState());
+//  }
+//
+//  @Test
+//  public void testAMRestart() throws Exception {
+//    ApplicationSubmissionContext subContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+//    subContext.setApplicationId(appID);
+//    subContext.setApplicationName("dummyApp");
+////    subContext.command = new ArrayList<String>();
+////    subContext.environment = new HashMap<String, String>();
+////    subContext.fsTokens = new ArrayList<String>();
+//    subContext.setFsTokensTodo(ByteBuffer.wrap(new byte[0]));
+//    SubmitApplicationRequest request = recordFactory
+//        .newRecordInstance(SubmitApplicationRequest.class);
+//    request.setApplicationSubmissionContext(subContext);
+//    clientRMService.submitApplication(request);
+//    AppAttempt application = asmContext.getApplications().get(appID);
+//    synchronized (schedulerNotify) {
+//      while(schedulerNotify.get() == 0) {
+//        schedulerNotify.wait();
+//      }
+//    }
+//    Assert.assertEquals(maxFailures, launcherCleanupCalled);
+//    Assert.assertEquals(maxFailures, launcherLaunchCalled);
+//    Assert.assertEquals(maxFailures, schedulerAddApplication);
+//    Assert.assertEquals(maxFailures, schedulerRemoveApplication);
+//    Assert.assertEquals(maxFailures, application.getFailedCount());
+//    waitForFailed(application, ApplicationState.FAILED);
+//    stop = true;
+//  }
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Fri Feb  7 20:33:01 2014
@@ -463,7 +463,7 @@ public class TestRMAppTransitions {
     LOG.info("--- START: testUnmanagedAppFailPath ---");
     application = testCreateAppRunning(subContext);
     RMAppEvent event = new RMAppFailedAttemptEvent(
-        application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false);
+        application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
     application.handle(event);
     rmDispatcher.await();
     RMAppAttempt appAttempt = application.getCurrentAppAttempt();
@@ -575,9 +575,7 @@ public class TestRMAppTransitions {
         RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-    sendAppUpdateSavedEvent(application);
-    assertKilled(application);
-    assertAppFinalStateSaved(application);
+    assertAppAndAttemptKilled(application);
   }
 
   @Test
@@ -590,9 +588,9 @@ public class TestRMAppTransitions {
     for (int i=1; i < maxAppAttempts; i++) {
       RMAppEvent event = 
           new RMAppFailedAttemptEvent(application.getApplicationId(), 
-              RMAppEventType.ATTEMPT_FAILED, "", false);
+              RMAppEventType.ATTEMPT_FAILED, "");
       application.handle(event);
-      assertAppState(RMAppState.ACCEPTED, application);
+      assertAppState(RMAppState.SUBMITTED, application);
       event = 
           new RMAppEvent(application.getApplicationId(), 
               RMAppEventType.APP_ACCEPTED);
@@ -606,7 +604,7 @@ public class TestRMAppTransitions {
     String message = "Test fail";
     RMAppEvent event = 
         new RMAppFailedAttemptEvent(application.getApplicationId(), 
-            RMAppEventType.ATTEMPT_FAILED, message, false);
+            RMAppEventType.ATTEMPT_FAILED, message);
     application.handle(event);
     rmDispatcher.await();
     sendAppUpdateSavedEvent(application);
@@ -623,16 +621,7 @@ public class TestRMAppTransitions {
         RMAppEventType.KILL);
     application.handle(event);
     rmDispatcher.await();
-
-    assertAppState(RMAppState.KILLING, application);
-    RMAppEvent appAttemptKilled =
-        new RMAppEvent(application.getApplicationId(),
-          RMAppEventType.ATTEMPT_KILLED);
-    application.handle(appAttemptKilled);
-    assertAppState(RMAppState.FINAL_SAVING, application);
-    sendAppUpdateSavedEvent(application);
-    assertKilled(application);
-    assertAppFinalStateSaved(application);
+    assertAppAndAttemptKilled(application);
   }
 
   @Test
@@ -671,10 +660,10 @@ public class TestRMAppTransitions {
     for (int i=1; i<maxAppAttempts; i++) {
       RMAppEvent event = 
           new RMAppFailedAttemptEvent(application.getApplicationId(), 
-              RMAppEventType.ATTEMPT_FAILED, "", false);
+              RMAppEventType.ATTEMPT_FAILED, "");
       application.handle(event);
       rmDispatcher.await();
-      assertAppState(RMAppState.ACCEPTED, application);
+      assertAppState(RMAppState.SUBMITTED, application);
       appAttempt = application.getCurrentAppAttempt();
       Assert.assertEquals(++expectedAttemptId, 
           appAttempt.getAppAttemptId().getAttemptId());
@@ -696,7 +685,7 @@ public class TestRMAppTransitions {
     // after max application attempts
     RMAppEvent event = 
         new RMAppFailedAttemptEvent(application.getApplicationId(), 
-            RMAppEventType.ATTEMPT_FAILED, "", false);
+            RMAppEventType.ATTEMPT_FAILED, "");
     application.handle(event);
     rmDispatcher.await();
     sendAppUpdateSavedEvent(application);
@@ -822,7 +811,7 @@ public class TestRMAppTransitions {
     // KILLED => KILLED event RMAppEventType.ATTEMPT_FAILED
     event = 
         new RMAppFailedAttemptEvent(application.getApplicationId(), 
-            RMAppEventType.ATTEMPT_FAILED, "", false);
+            RMAppEventType.ATTEMPT_FAILED, "");
     application.handle(event);
     rmDispatcher.await();
     assertTimesAtFinish(application);

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Fri Feb  7 20:33:01 2014
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -69,10 +68,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -80,6 +79,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -121,15 +121,14 @@ public class TestRMAppAttemptTransitions
   private AMLivelinessMonitor amFinishingMonitor;
   private RMStateStore store;
 
-  private RMAppImpl application;
+  private RMApp application;
   private RMAppAttempt applicationAttempt;
 
   private Configuration conf = new Configuration();
   private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
   private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
       spy(new ClientToAMTokenSecretManagerInRM());
-  private boolean transferStateFromPreviousAttempt = false;
-
+  
   private final class TestApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
 
@@ -152,11 +151,6 @@ public class TestRMAppAttemptTransitions
     @Override
     public void handle(RMAppEvent event) {
       assertEquals(application.getApplicationId(), event.getApplicationId());
-      if (event instanceof RMAppFailedAttemptEvent) {
-        transferStateFromPreviousAttempt =
-            ((RMAppFailedAttemptEvent) event)
-              .getTransferStateFromPreviousAttempt();
-      }
       try {
         application.handle(event);
       } catch (Throwable t) {
@@ -261,10 +255,10 @@ public class TestRMAppAttemptTransitions
 
     unmanagedAM = false;
     
-    application = mock(RMAppImpl.class);
+    application = mock(RMApp.class);
     applicationAttempt =
         new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler,
-          masterService, submissionContext, new Configuration(), false);
+          masterService, submissionContext, new Configuration(), user);
     when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
     when(application.getApplicationId()).thenReturn(applicationId);
     
@@ -378,7 +372,6 @@ public class TestRMAppAttemptTransitions
     assertNull(applicationAttempt.getFinalApplicationStatus());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
     verifyAttemptFinalStateSaved();
-    assertFalse(transferStateFromPreviousAttempt);
   }
   
   /**
@@ -415,6 +408,9 @@ public class TestRMAppAttemptTransitions
     assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
     assertEquals(0, applicationAttempt.getRanNodes().size());
     assertNull(applicationAttempt.getFinalApplicationStatus());
+    
+    // Check events
+    verify(application).handle(any(RMAppEvent.class));
   }
 
   /**
@@ -450,7 +446,7 @@ public class TestRMAppAttemptTransitions
     assertEquals(0, applicationAttempt.getRanNodes().size());
     
     // Check events
-    verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
+    verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
     verifyAttemptFinalStateSaved();
   }
@@ -533,7 +529,6 @@ public class TestRMAppAttemptTransitions
     assertEquals(container, applicationAttempt.getMasterContainer());
     assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
     verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
-    assertFalse(transferStateFromPreviousAttempt);
   }
   
   
@@ -549,7 +544,7 @@ public class TestRMAppAttemptTransitions
     applicationAttempt.handle(
         new RMAppAttemptEvent(
             applicationAttempt.getAppAttemptId(), 
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+            RMAppAttemptEventType.APP_ACCEPTED));
     
     if(unmanagedAM){
       assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, 
@@ -663,7 +658,6 @@ public class TestRMAppAttemptTransitions
         diagnostics));
     testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
         true);
-    assertFalse(transferStateFromPreviousAttempt);
   }
 
   private void sendAttemptUpdateSavedEvent(RMAppAttempt applicationAttempt) {
@@ -692,21 +686,6 @@ public class TestRMAppAttemptTransitions
   }
 
   @Test
-  public void testUnmanagedAMContainersCleanup() {
-    unmanagedAM = true;
-    when(submissionContext.getUnmanagedAM()).thenReturn(true);
-    when(submissionContext.getKeepContainersAcrossApplicationAttempts())
-      .thenReturn(true);
-    // submit AM and check it goes to SUBMITTED state
-    submitApplicationAttempt();
-    // launch AM and verify attempt failed
-    applicationAttempt.handle(new RMAppAttemptRegistrationEvent(
-      applicationAttempt.getAppAttemptId(), "host", 8042, "oldtrackingurl"));
-    sendAttemptUpdateSavedEvent(applicationAttempt);
-    assertFalse(transferStateFromPreviousAttempt);
-  }
-
-  @Test
   public void testNewToKilled() {
     applicationAttempt.handle(
         new RMAppAttemptEvent(
@@ -724,6 +703,16 @@ public class TestRMAppAttemptTransitions
             RMAppAttemptEventType.RECOVER));
     testAppAttemptRecoveredState();
   }
+  
+  @Test
+  public void testSubmittedToFailed() {
+    submitApplicationAttempt();
+    String message = "Rejected";
+    applicationAttempt.handle(
+        new RMAppAttemptRejectedEvent(
+            applicationAttempt.getAppAttemptId(), message));
+    testAppAttemptSubmittedToFailedState(message);
+  }
 
   @Test
   public void testSubmittedToKilled() {
@@ -1117,64 +1106,6 @@ public class TestRMAppAttemptTransitions
     Assert.assertNull(token);
   }
 
-  @Test
-  public void testFailedToFailed() {
-    // create a failed attempt.
-    when(submissionContext.getKeepContainersAcrossApplicationAttempts())
-      .thenReturn(true);
-    Container amContainer = allocateApplicationAttempt();
-    launchApplicationAttempt(amContainer);
-    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
-    ContainerStatus cs1 =
-        ContainerStatus.newInstance(amContainer.getId(),
-          ContainerState.COMPLETE, "some error", 123);
-    ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
-    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs1));
-    sendAttemptUpdateSavedEvent(applicationAttempt);
-    assertEquals(RMAppAttemptState.FAILED,
-      applicationAttempt.getAppAttemptState());
-    // should not kill containers when attempt fails.
-    assertTrue(transferStateFromPreviousAttempt);
-
-    // failed attempt captured the container finished event.
-    assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
-    ContainerStatus cs2 =
-        ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
-          ContainerState.COMPLETE, "", 0);
-    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs2));
-    assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
-    assertEquals(cs2.getContainerId(), applicationAttempt
-      .getJustFinishedContainers().get(0).getContainerId());
-  }
-
-
-  @Test
-  public void testContainersCleanupForLastAttempt() {
-    // create a failed attempt.
-    applicationAttempt =
-        new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), rmContext,
-          scheduler, masterService, submissionContext, new Configuration(),
-          true);
-    when(submissionContext.getKeepContainersAcrossApplicationAttempts())
-      .thenReturn(true);
-    when(submissionContext.getMaxAppAttempts()).thenReturn(1);
-    Container amContainer = allocateApplicationAttempt();
-    launchApplicationAttempt(amContainer);
-    runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
-    ContainerStatus cs1 =
-        ContainerStatus.newInstance(amContainer.getId(),
-          ContainerState.COMPLETE, "some error", 123);
-    ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
-    applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
-      appAttemptId, cs1));
-    sendAttemptUpdateSavedEvent(applicationAttempt);
-    assertEquals(RMAppAttemptState.FAILED,
-      applicationAttempt.getAppAttemptState());
-    assertFalse(transferStateFromPreviousAttempt);
-  }
-
   private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
     verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
     if (UserGroupInformation.isSecurityEnabled()) {

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Fri Feb  7 20:33:01 2014
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -66,10 +66,8 @@ public class TestQueueMetrics {
     MetricsSource queueSource= queueSource(ms, queueName);
     AppSchedulingInfo app = mockApp(user);
 
-    metrics.submitApp(user);
+    metrics.submitApp(user, 1);
     MetricsSource userSource = userSource(ms, queueName, user);
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    metrics.submitAppAttempt(user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
 
     metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
@@ -78,7 +76,7 @@ public class TestQueueMetrics {
     // configurable cluster/queue resources
     checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
 
     metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
@@ -87,10 +85,7 @@ public class TestQueueMetrics {
     metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
 
-    metrics.finishAppAttempt(
-        app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    metrics.finishApp(user, RMAppState.FINISHED);
+    metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
     assertNull(userSource);
   }
@@ -105,47 +100,39 @@ public class TestQueueMetrics {
     MetricsSource queueSource = queueSource(ms, queueName);
     AppSchedulingInfo app = mockApp(user);
 
-    metrics.submitApp(user);
+    metrics.submitApp(user, 1);
     MetricsSource userSource = userSource(ms, queueName, user);
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    metrics.submitAppAttempt(user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
 
-    metrics.finishAppAttempt(
-        app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    metrics.finishApp(app, RMAppAttemptState.FAILED);
+    checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
 
     // As the application has failed, framework retries the same application
     // based on configuration
-    metrics.submitAppAttempt(user);
+    metrics.submitApp(user, 2);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
 
     // Suppose say application has failed this time as well.
-    metrics.finishAppAttempt(
-        app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+    metrics.finishApp(app, RMAppAttemptState.FAILED);
+    checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
 
     // As the application has failed, framework retries the same application
     // based on configuration
-    metrics.submitAppAttempt(user);
+    metrics.submitApp(user, 3);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
 
-    // Suppose say application has failed, and there's no more retries.
-    metrics.finishAppAttempt(
-        app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-
-    metrics.finishApp(user, RMAppState.FAILED);
-    checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
+    // Suppose say application has finished.
+    metrics.finishApp(app, RMAppAttemptState.FINISHED);
+    checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
 
     assertNull(userSource);
   }
@@ -159,13 +146,9 @@ public class TestQueueMetrics {
     MetricsSource queueSource = queueSource(ms, queueName);
     AppSchedulingInfo app = mockApp(user);
 
-    metrics.submitApp(user);
+    metrics.submitApp(user, 1);
     MetricsSource userSource = userSource(ms, queueName, user);
 
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
-
-    metrics.submitAppAttempt(user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
 
@@ -177,7 +160,7 @@ public class TestQueueMetrics {
     checkResources(queueSource, 0, 0, 0, 0, 0,  100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
     checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
 
@@ -189,11 +172,7 @@ public class TestQueueMetrics {
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
-    metrics.finishAppAttempt(
-        app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
-    metrics.finishApp(user, RMAppState.FINISHED);
+    metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
     checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
   }
@@ -213,16 +192,10 @@ public class TestQueueMetrics {
     MetricsSource queueSource = queueSource(ms, leafQueueName);
     AppSchedulingInfo app = mockApp(user);
 
-    metrics.submitApp(user);
+    metrics.submitApp(user, 1);
     MetricsSource userSource = userSource(ms, leafQueueName, user);
     MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
 
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
-
-    metrics.submitAppAttempt(user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
@@ -238,7 +211,7 @@ public class TestQueueMetrics {
     checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
     checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
 
-    metrics.runAppAttempt(app.getApplicationId(), user);
+    metrics.incrAppsRunning(app, user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
 
@@ -258,14 +231,7 @@ public class TestQueueMetrics {
     checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
     checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
-    metrics.finishAppAttempt(
-        app.getApplicationId(), app.isPending(), app.getUser());
-    checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
-    checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
-
-    metrics.finishApp(user, RMAppState.FINISHED);
+    metrics.finishApp(app, RMAppAttemptState.FINISHED);
     checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
     checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true);
     checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
@@ -342,7 +308,7 @@ public class TestQueueMetrics {
     assertGauge("AppsPending", pending, rb);
     assertGauge("AppsRunning", running, rb);
     assertCounter("AppsCompleted", completed, rb);
-    assertCounter("AppsFailed", failed, rb);
+    assertGauge("AppsFailed", failed, rb);
     assertCounter("AppsKilled", killed, rb);
   }
 

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Fri Feb  7 20:33:01 2014
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -59,12 +58,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -383,24 +378,4 @@ public class TestSchedulerUtils {
           ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x");
     Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
   }
-
-  public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler(
-      final Map<ApplicationId, SchedulerApplication> applications,
-      EventHandler<SchedulerEvent> handler, String queueName) throws Exception {
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    AppAddedSchedulerEvent appAddedEvent =
-        new AppAddedSchedulerEvent(appId, queueName, "user");
-    handler.handle(appAddedEvent);
-    SchedulerApplication app = applications.get(appId);
-    // verify application is added.
-    Assert.assertNotNull(app);
-    Assert.assertEquals("user", app.getUser());
-
-    AppRemovedSchedulerEvent appRemoveEvent =
-        new AppRemovedSchedulerEvent(appId, RMAppState.FINISHED);
-    handler.handle(appRemoveEvent);
-    Assert.assertNull(applications.get(appId));
-    return app;
-  }
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Fri Feb  7 20:33:01 2014
@@ -304,7 +304,7 @@ public class TestApplicationLimits {
     int APPLICATION_ID = 0;
     // Submit first application
     FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_0, user_0);
+    queue.submitApplication(app_0, user_0, A);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -312,7 +312,7 @@ public class TestApplicationLimits {
 
     // Submit second application
     FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_1, user_0);
+    queue.submitApplication(app_1, user_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -320,14 +320,14 @@ public class TestApplicationLimits {
     
     // Submit third application, should remain pending
     FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_2, user_0);
+    queue.submitApplication(app_2, user_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     
     // Finish one application, app_2 should be activated
-    queue.finishApplicationAttempt(app_0, A);
+    queue.finishApplication(app_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -335,7 +335,7 @@ public class TestApplicationLimits {
     
     // Submit another one for user_0
     FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_3, user_0);
+    queue.submitApplication(app_3, user_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -346,7 +346,7 @@ public class TestApplicationLimits {
     
     // Submit first app for user_1
     FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
-    queue.submitApplicationAttempt(app_4, user_1);
+    queue.submitApplication(app_4, user_1, A);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -356,7 +356,7 @@ public class TestApplicationLimits {
 
     // Submit second app for user_1, should block due to queue-limit
     FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
-    queue.submitApplicationAttempt(app_5, user_1);
+    queue.submitApplication(app_5, user_1, A);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -365,7 +365,7 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumPendingApplications(user_1));
 
     // Now finish one app of user_1 so app_5 should be activated
-    queue.finishApplicationAttempt(app_4, A);
+    queue.finishApplication(app_4, A);
     assertEquals(3, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -385,7 +385,7 @@ public class TestApplicationLimits {
 
     // Submit first application
     FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_0, user_0);
+    queue.submitApplication(app_0, user_0, A);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -394,7 +394,7 @@ public class TestApplicationLimits {
 
     // Submit second application
     FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_1, user_0);
+    queue.submitApplication(app_1, user_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -403,7 +403,7 @@ public class TestApplicationLimits {
 
     // Submit third application, should remain pending
     FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_2, user_0);
+    queue.submitApplication(app_2, user_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -412,7 +412,7 @@ public class TestApplicationLimits {
 
     // Submit fourth application, should remain pending
     FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
-    queue.submitApplicationAttempt(app_3, user_0);
+    queue.submitApplication(app_3, user_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -420,7 +420,7 @@ public class TestApplicationLimits {
     assertTrue(queue.pendingApplications.contains(app_3));
 
     // Kill 3rd pending application
-    queue.finishApplicationAttempt(app_2, A);
+    queue.finishApplication(app_2, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -429,7 +429,7 @@ public class TestApplicationLimits {
     assertFalse(queue.activeApplications.contains(app_2));
 
     // Finish 1st application, app_3 should become active
-    queue.finishApplicationAttempt(app_0, A);
+    queue.finishApplication(app_0, A);
     assertEquals(2, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -439,7 +439,7 @@ public class TestApplicationLimits {
     assertFalse(queue.activeApplications.contains(app_0));
 
     // Finish 2nd application
-    queue.finishApplicationAttempt(app_1, A);
+    queue.finishApplication(app_1, A);
     assertEquals(1, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -447,7 +447,7 @@ public class TestApplicationLimits {
     assertFalse(queue.activeApplications.contains(app_1));
 
     // Finish 4th application
-    queue.finishApplicationAttempt(app_3, A);
+    queue.finishApplication(app_3, A);
     assertEquals(0, queue.getNumActiveApplications());
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumActiveApplications(user_0));
@@ -507,7 +507,7 @@ public class TestApplicationLimits {
     FiCaSchedulerApp app_0_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, 
             queue.getActiveUsersManager(), rmContext));
-    queue.submitApplicationAttempt(app_0_0, user_0);
+    queue.submitApplication(app_0_0, user_0, A);
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
     app_0_0_requests.add(
@@ -526,7 +526,7 @@ public class TestApplicationLimits {
     FiCaSchedulerApp app_0_1 = 
         spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, 
             queue.getActiveUsersManager(), rmContext));
-    queue.submitApplicationAttempt(app_0_1, user_0);
+    queue.submitApplication(app_0_1, user_0, A);
     
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
     app_0_1_requests.add(
@@ -545,7 +545,7 @@ public class TestApplicationLimits {
     FiCaSchedulerApp app_1_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, 
             queue.getActiveUsersManager(), rmContext));
-    queue.submitApplicationAttempt(app_1_0, user_1);
+    queue.submitApplication(app_1_0, user_1, A);
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
     app_1_0_requests.add(

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Feb  7 20:33:01 2014
@@ -64,10 +64,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -558,22 +555,19 @@ public class TestCapacityScheduler {
     ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
     ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
         appId, 1);
-    SchedulerEvent addAppEvent =
-        new AppAddedSchedulerEvent(appId, "default", "user");
-    cs.handle(addAppEvent);
-    SchedulerEvent addAttemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
-    cs.handle(addAttemptEvent);
+    SchedulerEvent event =
+        new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
+    cs.handle(event);
 
     // Verify the blacklist can be updated independent of requesting containers
     cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(),
         Collections.singletonList(host), null);
-    Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
+    Assert.assertTrue(cs.getApplication(appAttemptId).isBlacklisted(host));
     cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(), null,
         Collections.singletonList(host));
-    Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
+    Assert.assertFalse(cs.getApplication(appAttemptId).isBlacklisted(host));
     rm.stop();
   }
 
@@ -597,6 +591,66 @@ public class TestCapacityScheduler {
       assertTrue(appComparator.compare(app1, app3) < 0);
       assertTrue(appComparator.compare(app2, app3) < 0);
     }
+
+    @Test
+    public void testConcurrentAccessOnApplications() throws Exception {
+      CapacityScheduler cs = new CapacityScheduler();
+      verifyConcurrentAccessOnApplications(
+          cs.applications, FiCaSchedulerApp.class, Queue.class);
+    }
+
+    public static <T extends SchedulerApplication, Q extends Queue>
+        void verifyConcurrentAccessOnApplications(
+            final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
+            final Class<Q> queueClazz)
+                throws Exception {
+      final int size = 10000;
+      final ApplicationId appId = ApplicationId.newInstance(0, 0);
+      final Constructor<T> ctor = appClazz.getDeclaredConstructor(
+          ApplicationAttemptId.class, String.class, queueClazz,
+          ActiveUsersManager.class, RMContext.class);
+
+      ApplicationAttemptId appAttemptId0
+          = ApplicationAttemptId.newInstance(appId, 0);
+      applications.put(appAttemptId0, ctor.newInstance(
+              appAttemptId0, null, mock(queueClazz), null, null));
+      assertNotNull(applications.get(appAttemptId0));
+
+      // Imitating the thread of scheduler that will add and remove apps
+      final AtomicBoolean finished = new AtomicBoolean(false);
+      final AtomicBoolean failed = new AtomicBoolean(false);
+      Thread t = new Thread() {
+
+        @Override
+        public void run() {
+          for (int i = 1; i <= size; ++i) {
+            ApplicationAttemptId appAttemptId
+                = ApplicationAttemptId.newInstance(appId, i);
+            try {
+              applications.put(appAttemptId, ctor.newInstance(
+                  appAttemptId, null, mock(queueClazz), null, null));
+            } catch (Exception e) {
+              failed.set(true);
+              finished.set(true);
+              return;
+            }
+          }
+          for (int i = 1; i <= size; ++i) {
+            ApplicationAttemptId appAttemptId
+                = ApplicationAttemptId.newInstance(appId, i);
+            applications.remove(appAttemptId);
+          }
+          finished.set(true);
+        }
+      };
+      t.start();
+
+      // Imitating the thread of rmappattempt that will get the app
+      while (!finished.get()) {
+        assertNotNull(applications.get(appAttemptId0));
+      }
+      assertFalse(failed.get());
+    }
     
     @Test
     public void testGetAppsInQueue() throws Exception {
@@ -628,21 +682,4 @@ public class TestCapacityScheduler {
       Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
     }
 
-  @Test
-  public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
-
-    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
-    CapacityScheduler cs = new CapacityScheduler();
-    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(conf);
-    cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
-      null, null, new RMContainerTokenSecretManager(conf),
-      new NMTokenSecretManagerInRM(conf),
-      new ClientToAMTokenSecretManagerInRM()));
-
-    SchedulerApplication app =
-        TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
-          cs.getSchedulerApplications(), cs, "a1");
-    Assert.assertEquals("a1", app.getQueue().getQueueName());
-  }
- }
+}

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Fri Feb  7 20:33:01 2014
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -64,10 +63,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -275,14 +271,14 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, B);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_1, user_0);  // same user
+    a.submitApplication(app_1, user_0, B);  // same user
 
     
     // Setup some nodes
@@ -324,14 +320,14 @@ public class TestLeafQueue {
         .getMockApplicationAttemptId(0, 1);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null,
         rmContext);
-    d.submitApplicationAttempt(app_0, user_d);
+    d.submitApplication(app_0, user_d, D);
 
     // Attempt the same application again
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(0, 2);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null,
         rmContext);
-    d.submitApplicationAttempt(app_1, user_d); // same user
+    d.submitApplication(app_1, user_d, D); // same user
   }
 
 
@@ -349,37 +345,30 @@ public class TestLeafQueue {
         .getMockApplicationAttemptId(0, 1);
     FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null,
         rmContext);
-    AppAddedSchedulerEvent addAppEvent =
-        new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(),
-          a.getQueueName(), user_0);
-    cs.handle(addAppEvent);
-    AppAttemptAddedSchedulerEvent addAttemptEvent = 
-        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
-    cs.handle(addAttemptEvent);
-
+    a.submitApplication(app_0, user_0, B);
+    
+    when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
     AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
-        appAttemptId_0, RMAppAttemptState.FAILED, false);
+        appAttemptId_0, RMAppAttemptState.FAILED);
     cs.handle(event);
     
     assertEquals(0, a.getMetrics().getAppsPending());
-    assertEquals(0, a.getMetrics().getAppsFailed());
+    assertEquals(1, a.getMetrics().getAppsFailed());
 
     // Attempt the same application again
     final ApplicationAttemptId appAttemptId_1 = TestUtils
         .getMockApplicationAttemptId(0, 2);
     FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
         rmContext);
-    a.submitApplicationAttempt(app_1, user_0); // same user
+    a.submitApplication(app_1, user_0, B); // same user
 
     assertEquals(1, a.getMetrics().getAppsSubmitted());
     assertEquals(1, a.getMetrics().getAppsPending());
     
+    when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
     event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
-        RMAppAttemptState.FINISHED, false);
+        RMAppAttemptState.FINISHED);
     cs.handle(event);
-    AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
-        appAttemptId_0.getApplicationId(), RMAppState.FINISHED);
-    cs.handle(rEvent);
     
     assertEquals(1, a.getMetrics().getAppsSubmitted());
     assertEquals(0, a.getMetrics().getAppsPending());
@@ -407,14 +396,14 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_1, user_0);  // same user
+    a.submitApplication(app_1, user_0, A);  // same user
 
     
     // Setup some nodes
@@ -535,21 +524,21 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_1, user_0);  // same user
+    a.submitApplication(app_1, user_0, A);  // same user
 
     final ApplicationAttemptId appAttemptId_2 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
     FiCaSchedulerApp app_2 = 
         new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_2, user_1);
+    a.submitApplication(app_2, user_1, A);
 
     // Setup some nodes
     String host_0 = "127.0.0.1";
@@ -629,21 +618,21 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_1, user_0);  // same user
+    a.submitApplication(app_1, user_0, A);  // same user
 
     final ApplicationAttemptId appAttemptId_2 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
     FiCaSchedulerApp app_2 = 
         new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_2, user_1);
+    a.submitApplication(app_2, user_1, A);
 
     // Setup some nodes
     String host_0 = "127.0.0.1";
@@ -740,28 +729,28 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_1, user_0);  // same user
+    a.submitApplication(app_1, user_0, A);  // same user
 
     final ApplicationAttemptId appAttemptId_2 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
     FiCaSchedulerApp app_2 = 
         new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_2, user_1);
+    a.submitApplication(app_2, user_1, A);
 
     final ApplicationAttemptId appAttemptId_3 = 
         TestUtils.getMockApplicationAttemptId(3, 0); 
     FiCaSchedulerApp app_3 = 
         new FiCaSchedulerApp(appAttemptId_3, user_2, a, 
             a.getActiveUsersManager(), rmContext);
-    a.submitApplicationAttempt(app_3, user_2);
+    a.submitApplication(app_3, user_2, A);
     
     // Setup some nodes
     String host_0 = "127.0.0.1";
@@ -916,14 +905,14 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_1, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_1, user_1);  
+    a.submitApplication(app_1, user_1, A);  
 
     // Setup some nodes
     String host_0 = "127.0.0.1";
@@ -1018,14 +1007,14 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_0, a,
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 =
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_1, a,
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_1, user_1);
+    a.submitApplication(app_1, user_1, A);
 
     // Setup some nodes
     String host_0 = "127.0.0.1";
@@ -1122,14 +1111,14 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_1, a, 
             mock(ActiveUsersManager.class), rmContext);
-    a.submitApplicationAttempt(app_1, user_1);  
+    a.submitApplication(app_1, user_1, A);  
 
     // Setup some nodes
     String host_0 = "127.0.0.1";
@@ -1243,7 +1232,7 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext));
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks
     String host_0 = "127.0.0.1";
@@ -1384,7 +1373,7 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext));
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks
     String host_0 = "127.0.0.1";
@@ -1515,7 +1504,7 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext));
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
     
     // Setup some nodes and racks
     String host_0_0 = "127.0.0.1";
@@ -1618,21 +1607,21 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_e, e,
             mock(ActiveUsersManager.class), rmContext);
-    e.submitApplicationAttempt(app_0, user_e);
+    e.submitApplication(app_0, user_e, E);
 
     final ApplicationAttemptId appAttemptId_1 =
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_e, e,
             mock(ActiveUsersManager.class), rmContext);
-    e.submitApplicationAttempt(app_1, user_e);  // same user
+    e.submitApplication(app_1, user_e, E);  // same user
 
     final ApplicationAttemptId appAttemptId_2 =
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app_2 =
         new FiCaSchedulerApp(appAttemptId_2, user_e, e,
             mock(ActiveUsersManager.class), rmContext);
-    e.submitApplicationAttempt(app_2, user_e);  // same user
+    e.submitApplication(app_2, user_e, E);  // same user
 
     // before reinitialization
     assertEquals(2, e.activeApplications.size());
@@ -1696,21 +1685,21 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_e, e,
             mock(ActiveUsersManager.class), rmContext);
-    e.submitApplicationAttempt(app_0, user_e);
+    e.submitApplication(app_0, user_e, E);
 
     final ApplicationAttemptId appAttemptId_1 =
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_e, e,
             mock(ActiveUsersManager.class), rmContext);
-    e.submitApplicationAttempt(app_1, user_e);  // same user
+    e.submitApplication(app_1, user_e, E);  // same user
 
     final ApplicationAttemptId appAttemptId_2 =
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app_2 =
         new FiCaSchedulerApp(appAttemptId_2, user_e, e,
             mock(ActiveUsersManager.class), rmContext);
-    e.submitApplicationAttempt(app_2, user_e);  // same user
+    e.submitApplication(app_2, user_e, E);  // same user
 
     // before updating cluster resource
     assertEquals(2, e.activeApplications.size());
@@ -1773,14 +1762,14 @@ public class TestLeafQueue {
     FiCaSchedulerApp app_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
             mock(ActiveUsersManager.class), rmContext));
-    a.submitApplicationAttempt(app_0, user_0);
+    a.submitApplication(app_0, user_0, A);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
             mock(ActiveUsersManager.class), rmContext));
-    a.submitApplicationAttempt(app_1, user_0);
+    a.submitApplication(app_1, user_0, A);
 
     // Setup some nodes and racks
     String host_0_0 = "127.0.0.1";