You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/02/07 21:33:02 UTC
svn commit: r1565792 [4/5] - in
/hadoop/common/branches/branch-2.3/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/record...
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Fri Feb 7 20:33:01 2014
@@ -18,31 +18,49 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
/**
@@ -50,186 +68,238 @@ import org.junit.Test;
*
*/
public class TestAMRestart {
-
- @Test
- public void testAMRestartWithExistingContainers() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-
- MockRM rm1 = new MockRM(conf);
- rm1.start();
- RMApp app1 =
- rm1.submitApp(200, "name", "user",
- new HashMap<ApplicationAccessType, String>(), false, "default", -1,
- null, "MAPREDUCE", false, true);
- MockNM nm1 =
- new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
- nm1.registerNode();
- MockNM nm2 =
- new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService());
- nm2.registerNode();
-
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
- int NUM_CONTAINERS = 3;
- // allocate NUM_CONTAINERS containers
- am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
- new ArrayList<ContainerId>());
- nm1.nodeHeartbeat(true);
-
- // wait for containers to be allocated.
- List<Container> containers =
- am1.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers();
- while (containers.size() != NUM_CONTAINERS) {
- nm1.nodeHeartbeat(true);
- containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
- new ArrayList<ContainerId>()).getAllocatedContainers());
- Thread.sleep(200);
- }
-
- // launch the 2nd container, for testing running container transferred.
- nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
- ContainerId containerId2 =
- ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
- rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-
- // launch the 3rd container, for testing container allocated by previous
- // attempt is completed by the next new attempt/
- nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
- ContainerId containerId3 =
- ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
- rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
-
- // 4th container still in AQUIRED state. for testing Acquired container is
- // always killed.
- ContainerId containerId4 =
- ContainerId.newInstance(am1.getApplicationAttemptId(), 4);
- rm1.waitForState(nm1, containerId4, RMContainerState.ACQUIRED);
-
- // 5th container is in Allocated state. for testing allocated container is
- // always killed.
- am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
- nm1.nodeHeartbeat(true);
- ContainerId containerId5 =
- ContainerId.newInstance(am1.getApplicationAttemptId(), 5);
- rm1.waitForContainerAllocated(nm1, containerId5);
- rm1.waitForState(nm1, containerId5, RMContainerState.ALLOCATED);
-
- // 6th container is in Reserved state.
- am1.allocate("127.0.0.1", 6000, 1, new ArrayList<ContainerId>());
- ContainerId containerId6 =
- ContainerId.newInstance(am1.getApplicationAttemptId(), 6);
- nm1.nodeHeartbeat(true);
- SchedulerApplicationAttempt schedulerAttempt =
- ((CapacityScheduler) rm1.getResourceScheduler())
- .getCurrentAttemptForContainer(containerId6);
- while (schedulerAttempt.getReservedContainers().size() == 0) {
- System.out.println("Waiting for container " + containerId6
- + " to be reserved.");
- nm1.nodeHeartbeat(true);
- Thread.sleep(200);
- }
- // assert containerId6 is reserved.
- Assert.assertEquals(containerId6, schedulerAttempt.getReservedContainers()
- .get(0).getContainerId());
-
- // fail the AM by sending CONTAINER_FINISHED event without registering.
- nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
- am1.waitForState(RMAppAttemptState.FAILED);
-
- // wait for some time. previous AM's running containers should still remain
- // in scheduler even though am failed
- Thread.sleep(3000);
- rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
- // acquired/allocated containers are cleaned up.
- Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId4));
- Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId5));
-
- // wait for app to start a new attempt.
- rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
- // assert this is a new AM.
- ApplicationAttemptId newAttemptId =
- app1.getCurrentAppAttempt().getAppAttemptId();
- Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
-
- // launch the new AM
- RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
- nm1.nodeHeartbeat(true);
- MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
- RegisterApplicationMasterResponse registerResponse =
- am2.registerAppAttempt();
-
- // Assert two containers are running: container2 and container3;
- Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt()
- .size());
- boolean containerId2Exists = false, containerId3Exists = false;
- for (Container container : registerResponse
- .getContainersFromPreviousAttempt()) {
- if (container.getId().equals(containerId2)) {
- containerId2Exists = true;
- }
- if (container.getId().equals(containerId3)) {
- containerId3Exists = true;
- }
- }
- Assert.assertTrue(containerId2Exists && containerId3Exists);
- rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
-
- // complete container by sending the container complete event which has earlier
- // attempt's attemptId
- nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE);
- rm1.waitForState(nm1, containerId3, RMContainerState.COMPLETED);
-
- // Even though the completed container containerId3 event was sent to the
- // earlier failed attempt, new RMAppAttempt can also capture this container
- // info.
- // completed containerId4 is also transferred to the new attempt.
- RMAppAttempt newAttempt =
- app1.getRMAppAttempt(am2.getApplicationAttemptId());
- // 4 containers finished, acquired/allocated/reserved/completed.
- Assert.assertEquals(4, newAttempt.getJustFinishedContainers().size());
- boolean container3Exists = false, container4Exists = false, container5Exists =
- false, container6Exists = false;
- for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
- if(status.getContainerId().equals(containerId3)) {
- // containerId3 is the container ran by previous attempt but finished by the
- // new attempt.
- container3Exists = true;
- }
- if (status.getContainerId().equals(containerId4)) {
- // containerId4 is the Acquired Container killed by the previous attempt,
- // it's now inside new attempt's finished container list.
- container4Exists = true;
- }
- if (status.getContainerId().equals(containerId5)) {
- // containerId5 is the Allocated container killed by previous failed attempt.
- container5Exists = true;
- }
- if (status.getContainerId().equals(containerId6)) {
- // containerId6 is the reserved container killed by previous failed attempt.
- container6Exists = true;
- }
- }
- Assert.assertTrue(container3Exists && container4Exists && container5Exists
- && container6Exists);
-
- // New SchedulerApplicationAttempt also has the containers info.
- rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
-
- // record the scheduler attempt for testing.
- SchedulerApplicationAttempt schedulerNewAttempt =
- ((CapacityScheduler) rm1.getResourceScheduler())
- .getCurrentAttemptForContainer(containerId2);
- // finish this application
- MockRM.finishApplicationMaster(app1, rm1, nm1, am2);
-
- // the 2nd attempt released the 1st attempt's running container, when the
- // 2nd attempt finishes.
- Assert.assertFalse(schedulerNewAttempt.getLiveContainers().contains(
- containerId2));
- // all 4 normal containers finished.
- Assert.assertEquals(5, newAttempt.getJustFinishedContainers().size());
-
- rm1.stop();
- }
+// private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
+// ApplicationsManagerImpl appImpl;
+// RMContext asmContext = new RMContextImpl(new MemStore());
+// ApplicationTokenSecretManager appTokenSecretManager =
+// new ApplicationTokenSecretManager();
+// DummyResourceScheduler scheduler;
+// private ClientRMService clientRMService;
+// int count = 0;
+// ApplicationId appID;
+// final int maxFailures = 3;
+// AtomicInteger launchNotify = new AtomicInteger();
+// AtomicInteger schedulerNotify = new AtomicInteger();
+// volatile boolean stop = false;
+// int schedulerAddApplication = 0;
+// int schedulerRemoveApplication = 0;
+// int launcherLaunchCalled = 0;
+// int launcherCleanupCalled = 0;
+// private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+//
+// private class ExtApplicationsManagerImpl extends ApplicationsManagerImpl {
+// public ExtApplicationsManagerImpl(
+// ApplicationTokenSecretManager applicationTokenSecretManager,
+// YarnScheduler scheduler, RMContext asmContext) {
+// super(applicationTokenSecretManager, scheduler, asmContext);
+// }
+//
+// @Override
+// public EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
+// ApplicationTokenSecretManager tokenSecretManager) {
+// return new DummyAMLauncher();
+// }
+// }
+//
+// private class DummyAMLauncher implements EventHandler<ASMEvent<AMLauncherEventType>> {
+//
+// public DummyAMLauncher() {
+// asmContext.getDispatcher().register(AMLauncherEventType.class, this);
+// new Thread() {
+// public void run() {
+// while (!stop) {
+// LOG.info("DEBUG -- waiting for launch");
+// synchronized(launchNotify) {
+// while (launchNotify.get() == 0) {
+// try {
+// launchNotify.wait();
+// } catch (InterruptedException e) {
+// }
+// }
+// asmContext.getDispatcher().getEventHandler().handle(
+// new ApplicationEvent(
+// ApplicationEventType.LAUNCHED, appID));
+// launchNotify.addAndGet(-1);
+// }
+// }
+// }
+// }.start();
+// }
+//
+// @Override
+// public void handle(ASMEvent<AMLauncherEventType> event) {
+// switch (event.getType()) {
+// case CLEANUP:
+// launcherCleanupCalled++;
+// break;
+// case LAUNCH:
+// LOG.info("DEBUG -- launching");
+// launcherLaunchCalled++;
+// synchronized (launchNotify) {
+// launchNotify.addAndGet(1);
+// launchNotify.notify();
+// }
+// break;
+// default:
+// break;
+// }
+// }
+// }
+//
+// private class DummyResourceScheduler implements ResourceScheduler {
+//
+// @Override
+// public void removeNode(RMNode node) {
+// }
+//
+// @Override
+// public Allocation allocate(ApplicationId applicationId,
+// List<ResourceRequest> ask, List<Container> release) throws IOException {
+// Container container = recordFactory.newRecordInstance(Container.class);
+// container.setContainerToken(recordFactory.newRecordInstance(ContainerToken.class));
+// container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
+// container.setContainerManagerAddress("localhost");
+// container.setNodeHttpAddress("localhost:8042");
+// container.setId(recordFactory.newRecordInstance(ContainerId.class));
+// container.getId().setAppId(appID);
+// container.getId().setId(count);
+// count++;
+// return new Allocation(Arrays.asList(container), Resources.none());
+// }
+//
+// @Override
+// public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+// switch (event.getType()) {
+// case ADD:
+// schedulerAddApplication++;
+// break;
+// case EXPIRE:
+// schedulerRemoveApplication++;
+// LOG.info("REMOVING app : " + schedulerRemoveApplication);
+// if (schedulerRemoveApplication == maxFailures) {
+// synchronized (schedulerNotify) {
+// schedulerNotify.addAndGet(1);
+// schedulerNotify.notify();
+// }
+// }
+// break;
+// default:
+// break;
+// }
+// }
+//
+// @Override
+// public QueueInfo getQueueInfo(String queueName,
+// boolean includeChildQueues,
+// boolean recursive) throws IOException {
+// return null;
+// }
+// @Override
+// public List<QueueUserACLInfo> getQueueUserAclInfo() {
+// return null;
+// }
+// @Override
+// public void addApplication(ApplicationId applicationId,
+// ApplicationMaster master, String user, String queue, Priority priority,
+// ApplicationStore store)
+// throws IOException {
+// }
+// @Override
+// public void addNode(RMNode nodeInfo) {
+// }
+// @Override
+// public void recover(RMState state) throws Exception {
+// }
+// @Override
+// public void reinitialize(Configuration conf,
+// ContainerTokenSecretManager secretManager, RMContext rmContext)
+// throws IOException {
+// }
+//
+// @Override
+// public void nodeUpdate(RMNode nodeInfo,
+// Map<String, List<Container>> containers) {
+// }
+//
+// @Override
+// public Resource getMaximumResourceCapability() {
+// // TODO Auto-generated method stub
+// return null;
+// }
+//
+// @Override
+// public Resource getMinimumResourceCapability() {
+// // TODO Auto-generated method stub
+// return null;
+// }
+// }
+//
+// @Before
+// public void setUp() {
+//
+// asmContext.getDispatcher().register(ApplicationEventType.class,
+// new ResourceManager.ApplicationEventDispatcher(asmContext));
+//
+// appID = recordFactory.newRecordInstance(ApplicationId.class);
+// appID.setClusterTimestamp(System.currentTimeMillis());
+// appID.setId(1);
+// Configuration conf = new Configuration();
+// scheduler = new DummyResourceScheduler();
+// asmContext.getDispatcher().init(conf);
+// asmContext.getDispatcher().start();
+// asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
+// appImpl = new ExtApplicationsManagerImpl(appTokenSecretManager, scheduler, asmContext);
+//
+// conf.setLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 1000L);
+// conf.setInt(RMConfig.AM_MAX_RETRIES, maxFailures);
+// appImpl.init(conf);
+// appImpl.start();
+//
+// this.clientRMService = new ClientRMService(asmContext, appImpl
+// .getAmLivelinessMonitor(), appImpl.getClientToAMSecretManager(),
+// scheduler);
+// this.clientRMService.init(conf);
+// }
+//
+// @After
+// public void tearDown() {
+// }
+//
+// private void waitForFailed(AppAttempt application, ApplicationState
+// finalState) throws Exception {
+// int count = 0;
+// while(application.getState() != finalState && count < 10) {
+// Thread.sleep(500);
+// count++;
+// }
+// Assert.assertEquals(finalState, application.getState());
+// }
+//
+// @Test
+// public void testAMRestart() throws Exception {
+// ApplicationSubmissionContext subContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+// subContext.setApplicationId(appID);
+// subContext.setApplicationName("dummyApp");
+//// subContext.command = new ArrayList<String>();
+//// subContext.environment = new HashMap<String, String>();
+//// subContext.fsTokens = new ArrayList<String>();
+// subContext.setFsTokensTodo(ByteBuffer.wrap(new byte[0]));
+// SubmitApplicationRequest request = recordFactory
+// .newRecordInstance(SubmitApplicationRequest.class);
+// request.setApplicationSubmissionContext(subContext);
+// clientRMService.submitApplication(request);
+// AppAttempt application = asmContext.getApplications().get(appID);
+// synchronized (schedulerNotify) {
+// while(schedulerNotify.get() == 0) {
+// schedulerNotify.wait();
+// }
+// }
+// Assert.assertEquals(maxFailures, launcherCleanupCalled);
+// Assert.assertEquals(maxFailures, launcherLaunchCalled);
+// Assert.assertEquals(maxFailures, schedulerAddApplication);
+// Assert.assertEquals(maxFailures, schedulerRemoveApplication);
+// Assert.assertEquals(maxFailures, application.getFailedCount());
+// waitForFailed(application, ApplicationState.FAILED);
+// stop = true;
+// }
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Fri Feb 7 20:33:01 2014
@@ -463,7 +463,7 @@ public class TestRMAppTransitions {
LOG.info("--- START: testUnmanagedAppFailPath ---");
application = testCreateAppRunning(subContext);
RMAppEvent event = new RMAppFailedAttemptEvent(
- application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "", false);
+ application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
rmDispatcher.await();
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
@@ -575,9 +575,7 @@ public class TestRMAppTransitions {
RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
- sendAppUpdateSavedEvent(application);
- assertKilled(application);
- assertAppFinalStateSaved(application);
+ assertAppAndAttemptKilled(application);
}
@Test
@@ -590,9 +588,9 @@ public class TestRMAppTransitions {
for (int i=1; i < maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "", false);
+ RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
- assertAppState(RMAppState.ACCEPTED, application);
+ assertAppState(RMAppState.SUBMITTED, application);
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_ACCEPTED);
@@ -606,7 +604,7 @@ public class TestRMAppTransitions {
String message = "Test fail";
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, message, false);
+ RMAppEventType.ATTEMPT_FAILED, message);
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -623,16 +621,7 @@ public class TestRMAppTransitions {
RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
-
- assertAppState(RMAppState.KILLING, application);
- RMAppEvent appAttemptKilled =
- new RMAppEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_KILLED);
- application.handle(appAttemptKilled);
- assertAppState(RMAppState.FINAL_SAVING, application);
- sendAppUpdateSavedEvent(application);
- assertKilled(application);
- assertAppFinalStateSaved(application);
+ assertAppAndAttemptKilled(application);
}
@Test
@@ -671,10 +660,10 @@ public class TestRMAppTransitions {
for (int i=1; i<maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "", false);
+ RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
rmDispatcher.await();
- assertAppState(RMAppState.ACCEPTED, application);
+ assertAppState(RMAppState.SUBMITTED, application);
appAttempt = application.getCurrentAppAttempt();
Assert.assertEquals(++expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
@@ -696,7 +685,7 @@ public class TestRMAppTransitions {
// after max application attempts
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "", false);
+ RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -822,7 +811,7 @@ public class TestRMAppTransitions {
// KILLED => KILLED event RMAppEventType.ATTEMPT_FAILED
event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
- RMAppEventType.ATTEMPT_FAILED, "", false);
+ RMAppEventType.ATTEMPT_FAILED, "");
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Fri Feb 7 20:33:01 2014
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -69,10 +68,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -80,6 +79,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -121,15 +121,14 @@ public class TestRMAppAttemptTransitions
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore store;
- private RMAppImpl application;
+ private RMApp application;
private RMAppAttempt applicationAttempt;
private Configuration conf = new Configuration();
private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
spy(new ClientToAMTokenSecretManagerInRM());
- private boolean transferStateFromPreviousAttempt = false;
-
+
private final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> {
@@ -152,11 +151,6 @@ public class TestRMAppAttemptTransitions
@Override
public void handle(RMAppEvent event) {
assertEquals(application.getApplicationId(), event.getApplicationId());
- if (event instanceof RMAppFailedAttemptEvent) {
- transferStateFromPreviousAttempt =
- ((RMAppFailedAttemptEvent) event)
- .getTransferStateFromPreviousAttempt();
- }
try {
application.handle(event);
} catch (Throwable t) {
@@ -261,10 +255,10 @@ public class TestRMAppAttemptTransitions
unmanagedAM = false;
- application = mock(RMAppImpl.class);
+ application = mock(RMApp.class);
applicationAttempt =
new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler,
- masterService, submissionContext, new Configuration(), false);
+ masterService, submissionContext, new Configuration(), user);
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
when(application.getApplicationId()).thenReturn(applicationId);
@@ -378,7 +372,6 @@ public class TestRMAppAttemptTransitions
assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAttemptFinalStateSaved();
- assertFalse(transferStateFromPreviousAttempt);
}
/**
@@ -415,6 +408,9 @@ public class TestRMAppAttemptTransitions
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus());
+
+ // Check events
+ verify(application).handle(any(RMAppEvent.class));
}
/**
@@ -450,7 +446,7 @@ public class TestRMAppAttemptTransitions
assertEquals(0, applicationAttempt.getRanNodes().size());
// Check events
- verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
+ verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAttemptFinalStateSaved();
}
@@ -533,7 +529,6 @@ public class TestRMAppAttemptTransitions
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
- assertFalse(transferStateFromPreviousAttempt);
}
@@ -549,7 +544,7 @@ public class TestRMAppAttemptTransitions
applicationAttempt.handle(
new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(),
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ RMAppAttemptEventType.APP_ACCEPTED));
if(unmanagedAM){
assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
@@ -663,7 +658,6 @@ public class TestRMAppAttemptTransitions
diagnostics));
testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
true);
- assertFalse(transferStateFromPreviousAttempt);
}
private void sendAttemptUpdateSavedEvent(RMAppAttempt applicationAttempt) {
@@ -692,21 +686,6 @@ public class TestRMAppAttemptTransitions
}
@Test
- public void testUnmanagedAMContainersCleanup() {
- unmanagedAM = true;
- when(submissionContext.getUnmanagedAM()).thenReturn(true);
- when(submissionContext.getKeepContainersAcrossApplicationAttempts())
- .thenReturn(true);
- // submit AM and check it goes to SUBMITTED state
- submitApplicationAttempt();
- // launch AM and verify attempt failed
- applicationAttempt.handle(new RMAppAttemptRegistrationEvent(
- applicationAttempt.getAppAttemptId(), "host", 8042, "oldtrackingurl"));
- sendAttemptUpdateSavedEvent(applicationAttempt);
- assertFalse(transferStateFromPreviousAttempt);
- }
-
- @Test
public void testNewToKilled() {
applicationAttempt.handle(
new RMAppAttemptEvent(
@@ -724,6 +703,16 @@ public class TestRMAppAttemptTransitions
RMAppAttemptEventType.RECOVER));
testAppAttemptRecoveredState();
}
+
+ @Test
+ public void testSubmittedToFailed() {
+ submitApplicationAttempt();
+ String message = "Rejected";
+ applicationAttempt.handle(
+ new RMAppAttemptRejectedEvent(
+ applicationAttempt.getAppAttemptId(), message));
+ testAppAttemptSubmittedToFailedState(message);
+ }
@Test
public void testSubmittedToKilled() {
@@ -1117,64 +1106,6 @@ public class TestRMAppAttemptTransitions
Assert.assertNull(token);
}
- @Test
- public void testFailedToFailed() {
- // create a failed attempt.
- when(submissionContext.getKeepContainersAcrossApplicationAttempts())
- .thenReturn(true);
- Container amContainer = allocateApplicationAttempt();
- launchApplicationAttempt(amContainer);
- runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
- ContainerStatus cs1 =
- ContainerStatus.newInstance(amContainer.getId(),
- ContainerState.COMPLETE, "some error", 123);
- ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
- applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- appAttemptId, cs1));
- sendAttemptUpdateSavedEvent(applicationAttempt);
- assertEquals(RMAppAttemptState.FAILED,
- applicationAttempt.getAppAttemptState());
- // should not kill containers when attempt fails.
- assertTrue(transferStateFromPreviousAttempt);
-
- // failed attempt captured the container finished event.
- assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
- ContainerStatus cs2 =
- ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
- ContainerState.COMPLETE, "", 0);
- applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- appAttemptId, cs2));
- assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
- assertEquals(cs2.getContainerId(), applicationAttempt
- .getJustFinishedContainers().get(0).getContainerId());
- }
-
-
- @Test
- public void testContainersCleanupForLastAttempt() {
- // create a failed attempt.
- applicationAttempt =
- new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), rmContext,
- scheduler, masterService, submissionContext, new Configuration(),
- true);
- when(submissionContext.getKeepContainersAcrossApplicationAttempts())
- .thenReturn(true);
- when(submissionContext.getMaxAppAttempts()).thenReturn(1);
- Container amContainer = allocateApplicationAttempt();
- launchApplicationAttempt(amContainer);
- runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
- ContainerStatus cs1 =
- ContainerStatus.newInstance(amContainer.getId(),
- ContainerState.COMPLETE, "some error", 123);
- ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
- applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- appAttemptId, cs1));
- sendAttemptUpdateSavedEvent(applicationAttempt);
- assertEquals(RMAppAttemptState.FAILED,
- applicationAttempt.getAppAttemptState());
- assertFalse(transferStateFromPreviousAttempt);
- }
-
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Fri Feb 7 20:33:01 2014
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -66,10 +66,8 @@ public class TestQueueMetrics {
MetricsSource queueSource= queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user);
- metrics.submitApp(user);
+ metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user);
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
- metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
@@ -78,7 +76,7 @@ public class TestQueueMetrics {
// configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
@@ -87,10 +85,7 @@ public class TestQueueMetrics {
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
- metrics.finishAppAttempt(
- app.getApplicationId(), app.isPending(), app.getUser());
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
- metrics.finishApp(user, RMAppState.FINISHED);
+ metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
assertNull(userSource);
}
@@ -105,47 +100,39 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user);
- metrics.submitApp(user);
+ metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user);
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
- metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
- metrics.finishAppAttempt(
- app.getApplicationId(), app.isPending(), app.getUser());
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+ metrics.finishApp(app, RMAppAttemptState.FAILED);
+ checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
// As the application has failed, framework retries the same application
// based on configuration
- metrics.submitAppAttempt(user);
+ metrics.submitApp(user, 2);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
// Suppose say application has failed this time as well.
- metrics.finishAppAttempt(
- app.getApplicationId(), app.isPending(), app.getUser());
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
+ metrics.finishApp(app, RMAppAttemptState.FAILED);
+ checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
// As the application has failed, framework retries the same application
// based on configuration
- metrics.submitAppAttempt(user);
+ metrics.submitApp(user, 3);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
- // Suppose say application has failed, and there's no more retries.
- metrics.finishAppAttempt(
- app.getApplicationId(), app.isPending(), app.getUser());
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
-
- metrics.finishApp(user, RMAppState.FAILED);
- checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
+ // Suppose say application has finished.
+ metrics.finishApp(app, RMAppAttemptState.FINISHED);
+ checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
assertNull(userSource);
}
@@ -159,13 +146,9 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user);
- metrics.submitApp(user);
+ metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, queueName, user);
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
-
- metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
@@ -177,7 +160,7 @@ public class TestQueueMetrics {
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
@@ -189,11 +172,7 @@ public class TestQueueMetrics {
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
- metrics.finishAppAttempt(
- app.getApplicationId(), app.isPending(), app.getUser());
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
- metrics.finishApp(user, RMAppState.FINISHED);
+ metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
}
@@ -213,16 +192,10 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, leafQueueName);
AppSchedulingInfo app = mockApp(user);
- metrics.submitApp(user);
+ metrics.submitApp(user, 1);
MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
-
- metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true);
checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
@@ -238,7 +211,7 @@ public class TestQueueMetrics {
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
- metrics.runAppAttempt(app.getApplicationId(), user);
+ metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
@@ -258,14 +231,7 @@ public class TestQueueMetrics {
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
- metrics.finishAppAttempt(
- app.getApplicationId(), app.isPending(), app.getUser());
- checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
- checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
-
- metrics.finishApp(user, RMAppState.FINISHED);
+ metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true);
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
@@ -342,7 +308,7 @@ public class TestQueueMetrics {
assertGauge("AppsPending", pending, rb);
assertGauge("AppsRunning", running, rb);
assertCounter("AppsCompleted", completed, rb);
- assertCounter("AppsFailed", failed, rb);
+ assertGauge("AppsFailed", failed, rb);
assertCounter("AppsKilled", killed, rb);
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Fri Feb 7 20:33:01 2014
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -59,12 +58,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -383,24 +378,4 @@ public class TestSchedulerUtils {
ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x");
Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus());
}
-
- public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler(
- final Map<ApplicationId, SchedulerApplication> applications,
- EventHandler<SchedulerEvent> handler, String queueName) throws Exception {
- ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- AppAddedSchedulerEvent appAddedEvent =
- new AppAddedSchedulerEvent(appId, queueName, "user");
- handler.handle(appAddedEvent);
- SchedulerApplication app = applications.get(appId);
- // verify application is added.
- Assert.assertNotNull(app);
- Assert.assertEquals("user", app.getUser());
-
- AppRemovedSchedulerEvent appRemoveEvent =
- new AppRemovedSchedulerEvent(appId, RMAppState.FINISHED);
- handler.handle(appRemoveEvent);
- Assert.assertNull(applications.get(appId));
- return app;
- }
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Fri Feb 7 20:33:01 2014
@@ -304,7 +304,7 @@ public class TestApplicationLimits {
int APPLICATION_ID = 0;
// Submit first application
FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_0, user_0);
+ queue.submitApplication(app_0, user_0, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -312,7 +312,7 @@ public class TestApplicationLimits {
// Submit second application
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_1, user_0);
+ queue.submitApplication(app_1, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -320,14 +320,14 @@ public class TestApplicationLimits {
// Submit third application, should remain pending
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_2, user_0);
+ queue.submitApplication(app_2, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
// Finish one application, app_2 should be activated
- queue.finishApplicationAttempt(app_0, A);
+ queue.finishApplication(app_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -335,7 +335,7 @@ public class TestApplicationLimits {
// Submit another one for user_0
FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_3, user_0);
+ queue.submitApplication(app_3, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -346,7 +346,7 @@ public class TestApplicationLimits {
// Submit first app for user_1
FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
- queue.submitApplicationAttempt(app_4, user_1);
+ queue.submitApplication(app_4, user_1, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -356,7 +356,7 @@ public class TestApplicationLimits {
// Submit second app for user_1, should block due to queue-limit
FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
- queue.submitApplicationAttempt(app_5, user_1);
+ queue.submitApplication(app_5, user_1, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -365,7 +365,7 @@ public class TestApplicationLimits {
assertEquals(1, queue.getNumPendingApplications(user_1));
// Now finish one app of user_1 so app_5 should be activated
- queue.finishApplicationAttempt(app_4, A);
+ queue.finishApplication(app_4, A);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -385,7 +385,7 @@ public class TestApplicationLimits {
// Submit first application
FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_0, user_0);
+ queue.submitApplication(app_0, user_0, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -394,7 +394,7 @@ public class TestApplicationLimits {
// Submit second application
FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_1, user_0);
+ queue.submitApplication(app_1, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -403,7 +403,7 @@ public class TestApplicationLimits {
// Submit third application, should remain pending
FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_2, user_0);
+ queue.submitApplication(app_2, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -412,7 +412,7 @@ public class TestApplicationLimits {
// Submit fourth application, should remain pending
FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
- queue.submitApplicationAttempt(app_3, user_0);
+ queue.submitApplication(app_3, user_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -420,7 +420,7 @@ public class TestApplicationLimits {
assertTrue(queue.pendingApplications.contains(app_3));
// Kill 3rd pending application
- queue.finishApplicationAttempt(app_2, A);
+ queue.finishApplication(app_2, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -429,7 +429,7 @@ public class TestApplicationLimits {
assertFalse(queue.activeApplications.contains(app_2));
// Finish 1st application, app_3 should become active
- queue.finishApplicationAttempt(app_0, A);
+ queue.finishApplication(app_0, A);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
@@ -439,7 +439,7 @@ public class TestApplicationLimits {
assertFalse(queue.activeApplications.contains(app_0));
// Finish 2nd application
- queue.finishApplicationAttempt(app_1, A);
+ queue.finishApplication(app_1, A);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(1, queue.getNumActiveApplications(user_0));
@@ -447,7 +447,7 @@ public class TestApplicationLimits {
assertFalse(queue.activeApplications.contains(app_1));
// Finish 4th application
- queue.finishApplicationAttempt(app_3, A);
+ queue.finishApplication(app_3, A);
assertEquals(0, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(0, queue.getNumActiveApplications(user_0));
@@ -507,7 +507,7 @@ public class TestApplicationLimits {
FiCaSchedulerApp app_0_0 =
spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), rmContext));
- queue.submitApplicationAttempt(app_0_0, user_0);
+ queue.submitApplication(app_0_0, user_0, A);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
app_0_0_requests.add(
@@ -526,7 +526,7 @@ public class TestApplicationLimits {
FiCaSchedulerApp app_0_1 =
spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), rmContext));
- queue.submitApplicationAttempt(app_0_1, user_0);
+ queue.submitApplication(app_0_1, user_0, A);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(
@@ -545,7 +545,7 @@ public class TestApplicationLimits {
FiCaSchedulerApp app_1_0 =
spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), rmContext));
- queue.submitApplicationAttempt(app_1_0, user_1);
+ queue.submitApplication(app_1_0, user_1, A);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Feb 7 20:33:01 2014
@@ -64,10 +64,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -558,22 +555,19 @@ public class TestCapacityScheduler {
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
- SchedulerEvent addAppEvent =
- new AppAddedSchedulerEvent(appId, "default", "user");
- cs.handle(addAppEvent);
- SchedulerEvent addAttemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId, false);
- cs.handle(addAttemptEvent);
+ SchedulerEvent event =
+ new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user");
+ cs.handle(event);
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
- Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
+ Assert.assertTrue(cs.getApplication(appAttemptId).isBlacklisted(host));
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
- Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
+ Assert.assertFalse(cs.getApplication(appAttemptId).isBlacklisted(host));
rm.stop();
}
@@ -597,6 +591,66 @@ public class TestCapacityScheduler {
assertTrue(appComparator.compare(app1, app3) < 0);
assertTrue(appComparator.compare(app2, app3) < 0);
}
+
+ @Test
+ public void testConcurrentAccessOnApplications() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ verifyConcurrentAccessOnApplications(
+ cs.applications, FiCaSchedulerApp.class, Queue.class);
+ }
+
+ public static <T extends SchedulerApplication, Q extends Queue>
+ void verifyConcurrentAccessOnApplications(
+ final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
+ final Class<Q> queueClazz)
+ throws Exception {
+ final int size = 10000;
+ final ApplicationId appId = ApplicationId.newInstance(0, 0);
+ final Constructor<T> ctor = appClazz.getDeclaredConstructor(
+ ApplicationAttemptId.class, String.class, queueClazz,
+ ActiveUsersManager.class, RMContext.class);
+
+ ApplicationAttemptId appAttemptId0
+ = ApplicationAttemptId.newInstance(appId, 0);
+ applications.put(appAttemptId0, ctor.newInstance(
+ appAttemptId0, null, mock(queueClazz), null, null));
+ assertNotNull(applications.get(appAttemptId0));
+
+ // Imitating the thread of scheduler that will add and remove apps
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final AtomicBoolean failed = new AtomicBoolean(false);
+ Thread t = new Thread() {
+
+ @Override
+ public void run() {
+ for (int i = 1; i <= size; ++i) {
+ ApplicationAttemptId appAttemptId
+ = ApplicationAttemptId.newInstance(appId, i);
+ try {
+ applications.put(appAttemptId, ctor.newInstance(
+ appAttemptId, null, mock(queueClazz), null, null));
+ } catch (Exception e) {
+ failed.set(true);
+ finished.set(true);
+ return;
+ }
+ }
+ for (int i = 1; i <= size; ++i) {
+ ApplicationAttemptId appAttemptId
+ = ApplicationAttemptId.newInstance(appId, i);
+ applications.remove(appAttemptId);
+ }
+ finished.set(true);
+ }
+ };
+ t.start();
+
+ // Imitating the thread of rmappattempt that will get the app
+ while (!finished.get()) {
+ assertNotNull(applications.get(appAttemptId0));
+ }
+ assertFalse(failed.get());
+ }
@Test
public void testGetAppsInQueue() throws Exception {
@@ -628,21 +682,4 @@ public class TestCapacityScheduler {
Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
}
- @Test
- public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
-
- AsyncDispatcher rmDispatcher = new AsyncDispatcher();
- CapacityScheduler cs = new CapacityScheduler();
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- setupQueueConfiguration(conf);
- cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null,
- null, null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM()));
-
- SchedulerApplication app =
- TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
- cs.getSchedulerApplications(), cs, "a1");
- Assert.assertEquals("a1", app.getQueue().getQueueName());
- }
- }
+}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Fri Feb 7 20:33:01 2014
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -64,10 +63,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -275,14 +271,14 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, B);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_1, user_0); // same user
+ a.submitApplication(app_1, user_0, B); // same user
// Setup some nodes
@@ -324,14 +320,14 @@ public class TestLeafQueue {
.getMockApplicationAttemptId(0, 1);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null,
rmContext);
- d.submitApplicationAttempt(app_0, user_d);
+ d.submitApplication(app_0, user_d, D);
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null,
rmContext);
- d.submitApplicationAttempt(app_1, user_d); // same user
+ d.submitApplication(app_1, user_d, D); // same user
}
@@ -349,37 +345,30 @@ public class TestLeafQueue {
.getMockApplicationAttemptId(0, 1);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null,
rmContext);
- AppAddedSchedulerEvent addAppEvent =
- new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(),
- a.getQueueName(), user_0);
- cs.handle(addAppEvent);
- AppAttemptAddedSchedulerEvent addAttemptEvent =
- new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
- cs.handle(addAttemptEvent);
-
+ a.submitApplication(app_0, user_0, B);
+
+ when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
- appAttemptId_0, RMAppAttemptState.FAILED, false);
+ appAttemptId_0, RMAppAttemptState.FAILED);
cs.handle(event);
assertEquals(0, a.getMetrics().getAppsPending());
- assertEquals(0, a.getMetrics().getAppsFailed());
+ assertEquals(1, a.getMetrics().getAppsFailed());
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
rmContext);
- a.submitApplicationAttempt(app_1, user_0); // same user
+ a.submitApplication(app_1, user_0, B); // same user
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(1, a.getMetrics().getAppsPending());
+ when(cs.getApplication(appAttemptId_1)).thenReturn(app_0);
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
- RMAppAttemptState.FINISHED, false);
+ RMAppAttemptState.FINISHED);
cs.handle(event);
- AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
- appAttemptId_0.getApplicationId(), RMAppState.FINISHED);
- cs.handle(rEvent);
assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(0, a.getMetrics().getAppsPending());
@@ -407,14 +396,14 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_1, user_0); // same user
+ a.submitApplication(app_1, user_0, A); // same user
// Setup some nodes
@@ -535,21 +524,21 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_1, user_0); // same user
+ a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_2, user_1);
+ a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "127.0.0.1";
@@ -629,21 +618,21 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_1, user_0); // same user
+ a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_2, user_1);
+ a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "127.0.0.1";
@@ -740,28 +729,28 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_1, user_0); // same user
+ a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_2, user_1);
+ a.submitApplication(app_2, user_1, A);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), rmContext);
- a.submitApplicationAttempt(app_3, user_2);
+ a.submitApplication(app_3, user_2, A);
// Setup some nodes
String host_0 = "127.0.0.1";
@@ -916,14 +905,14 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_1, user_1);
+ a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "127.0.0.1";
@@ -1018,14 +1007,14 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_1, user_1);
+ a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "127.0.0.1";
@@ -1122,14 +1111,14 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext);
- a.submitApplicationAttempt(app_1, user_1);
+ a.submitApplication(app_1, user_1, A);
// Setup some nodes
String host_0 = "127.0.0.1";
@@ -1243,7 +1232,7 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext));
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "127.0.0.1";
@@ -1384,7 +1373,7 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext));
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0 = "127.0.0.1";
@@ -1515,7 +1504,7 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext));
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
String host_0_0 = "127.0.0.1";
@@ -1618,21 +1607,21 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_e, e,
mock(ActiveUsersManager.class), rmContext);
- e.submitApplicationAttempt(app_0, user_e);
+ e.submitApplication(app_0, user_e, E);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_e, e,
mock(ActiveUsersManager.class), rmContext);
- e.submitApplicationAttempt(app_1, user_e); // same user
+ e.submitApplication(app_1, user_e, E); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_e, e,
mock(ActiveUsersManager.class), rmContext);
- e.submitApplicationAttempt(app_2, user_e); // same user
+ e.submitApplication(app_2, user_e, E); // same user
// before reinitialization
assertEquals(2, e.activeApplications.size());
@@ -1696,21 +1685,21 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_e, e,
mock(ActiveUsersManager.class), rmContext);
- e.submitApplicationAttempt(app_0, user_e);
+ e.submitApplication(app_0, user_e, E);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_e, e,
mock(ActiveUsersManager.class), rmContext);
- e.submitApplicationAttempt(app_1, user_e); // same user
+ e.submitApplication(app_1, user_e, E); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_e, e,
mock(ActiveUsersManager.class), rmContext);
- e.submitApplicationAttempt(app_2, user_e); // same user
+ e.submitApplication(app_2, user_e, E); // same user
// before updating cluster resource
assertEquals(2, e.activeApplications.size());
@@ -1773,14 +1762,14 @@ public class TestLeafQueue {
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext));
- a.submitApplicationAttempt(app_0, user_0);
+ a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext));
- a.submitApplicationAttempt(app_1, user_0);
+ a.submitApplication(app_1, user_0, A);
// Setup some nodes and racks
String host_0_0 = "127.0.0.1";