You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/09/25 16:47:00 UTC
svn commit: r1175403 [2/2] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org...
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1175403&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Sun Sep 25 14:46:59 2011
@@ -0,0 +1,403 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRMAppAttemptTransitions {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestRMAppAttemptTransitions.class);
+
+ private static final String EMPTY_DIAGNOSTICS = "";
+
+ private RMContext rmContext;
+ private YarnScheduler scheduler;
+ private ApplicationMasterService masterService;
+ private ApplicationMasterLauncher applicationMasterLauncher;
+
+ private RMApp application;
+ private RMAppAttempt applicationAttempt;
+
+ private final class TestApplicationAttemptEventDispatcher implements
+ EventHandler<RMAppAttemptEvent> {
+
+ @Override
+ public void handle(RMAppAttemptEvent event) {
+ ApplicationAttemptId appID = event.getApplicationAttemptId();
+ assertEquals(applicationAttempt.getAppAttemptId(), appID);
+ try {
+ applicationAttempt.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for application " + appID, t);
+ }
+ }
+ }
+
+ // handle all the RM application events - same as in ResourceManager.java
+ private final class TestApplicationEventDispatcher implements
+ EventHandler<RMAppEvent> {
+ @Override
+ public void handle(RMAppEvent event) {
+ assertEquals(application.getApplicationId(), event.getApplicationId());
+ try {
+ application.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for application " + application.getApplicationId(), t);
+ }
+ }
+ }
+
+ private final class TestSchedulerEventDispatcher implements
+ EventHandler<SchedulerEvent> {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ }
+
+ private final class TestAMLauncherEventDispatcher implements
+ EventHandler<AMLauncherEvent> {
+ @Override
+ public void handle(AMLauncherEvent event) {
+ applicationMasterLauncher.handle(event);
+ }
+ }
+
+ private static int appId = 1;
+
+ @Before
+ public void setUp() throws Exception {
+ InlineDispatcher rmDispatcher = new InlineDispatcher();
+
+ ContainerAllocationExpirer containerAllocationExpirer =
+ mock(ContainerAllocationExpirer.class);
+ AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
+ rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
+ containerAllocationExpirer, amLivelinessMonitor);
+
+ scheduler = mock(YarnScheduler.class);
+ masterService = mock(ApplicationMasterService.class);
+ applicationMasterLauncher = mock(ApplicationMasterLauncher.class);
+
+ rmDispatcher.register(RMAppAttemptEventType.class,
+ new TestApplicationAttemptEventDispatcher());
+
+ rmDispatcher.register(RMAppEventType.class,
+ new TestApplicationEventDispatcher());
+
+ rmDispatcher.register(SchedulerEventType.class,
+ new TestSchedulerEventDispatcher());
+
+ rmDispatcher.register(AMLauncherEventType.class,
+ new TestAMLauncherEventDispatcher());
+
+ rmDispatcher.init(new Configuration());
+ rmDispatcher.start();
+
+
+ ApplicationId applicationId = MockApps.newAppID(appId++);
+ ApplicationAttemptId applicationAttemptId =
+ MockApps.newAppAttemptID(applicationId, 0);
+
+ final String user = MockApps.newUserName();
+ final String queue = MockApps.newQueue();
+ ApplicationSubmissionContext submissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(submissionContext.getUser()).thenReturn(user);
+ when(submissionContext.getQueue()).thenReturn(queue);
+ ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
+ Resource resource = mock(Resource.class);
+ when(amContainerSpec.getResource()).thenReturn(resource);
+ when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
+
+ application = mock(RMApp.class);
+ applicationAttempt =
+ new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler,
+ masterService, submissionContext);
+ when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
+ when(application.getApplicationId()).thenReturn(applicationId);
+
+ testAppAttemptNewState();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ ((AsyncDispatcher)this.rmContext.getDispatcher()).stop();
+ }
+
+
+ /**
+ * {@link RMAppAttemptState#NEW}
+ */
+ private void testAppAttemptNewState() {
+ assertEquals(RMAppAttemptState.NEW,
+ applicationAttempt.getAppAttemptState());
+ assertEquals(0, applicationAttempt.getDiagnostics().length());
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertNull(applicationAttempt.getMasterContainer());
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
+ assertEquals(0, applicationAttempt.getRanNodes().size());
+ }
+
+ /**
+ * {@link RMAppAttemptState#SUBMITTED}
+ */
+ private void testAppAttemptSubmittedState() {
+ assertEquals(RMAppAttemptState.SUBMITTED,
+ applicationAttempt.getAppAttemptState());
+ assertEquals(0, applicationAttempt.getDiagnostics().length());
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertNull(applicationAttempt.getMasterContainer());
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
+ assertEquals(0, applicationAttempt.getRanNodes().size());
+
+ // Check events
+ verify(masterService).
+ registerAppAttempt(applicationAttempt.getAppAttemptId());
+ verify(scheduler).handle(any(AppAddedSchedulerEvent.class));
+ }
+
+ /**
+ * {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED}
+ */
+ private void testAppAttemptSubmittedToFailedState(String diagnostics) {
+ assertEquals(RMAppAttemptState.FAILED,
+ applicationAttempt.getAppAttemptState());
+ assertEquals(diagnostics, applicationAttempt.getDiagnostics());
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertNull(applicationAttempt.getMasterContainer());
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
+ assertEquals(0, applicationAttempt.getRanNodes().size());
+
+ // Check events
+ verify(application).handle(any(RMAppRejectedEvent.class));
+ }
+
+ /**
+ * {@link RMAppAttemptState#KILLED}
+ */
+ private void testAppAttemptKilledState(Container amContainer,
+ String diagnostics) {
+ assertEquals(RMAppAttemptState.KILLED,
+ applicationAttempt.getAppAttemptState());
+ assertEquals(diagnostics, applicationAttempt.getDiagnostics());
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(amContainer, applicationAttempt.getMasterContainer());
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
+ assertEquals(0, applicationAttempt.getRanNodes().size());
+ }
+
+ /**
+ * {@link RMAppAttemptState#SCHEDULED}
+ */
+ private void testAppAttemptScheduledState() {
+ assertEquals(RMAppAttemptState.SCHEDULED,
+ applicationAttempt.getAppAttemptState());
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertNull(applicationAttempt.getMasterContainer());
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
+ assertEquals(0, applicationAttempt.getRanNodes().size());
+
+ // Check events
+ verify(application).handle(any(RMAppEvent.class));
+ verify(scheduler).
+ allocate(any(ApplicationAttemptId.class),
+ any(List.class), any(List.class));
+ }
+
+ /**
+ * {@link RMAppAttemptState#ALLOCATED}
+ */
+ private void testAppAttemptAllocatedState(Container amContainer) {
+ assertEquals(RMAppAttemptState.ALLOCATED,
+ applicationAttempt.getAppAttemptState());
+ assertEquals(amContainer, applicationAttempt.getMasterContainer());
+
+ // Check events
+ verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
+ verify(scheduler, times(2)).
+ allocate(
+ any(ApplicationAttemptId.class), any(List.class), any(List.class));
+ }
+
+ /**
+ * {@link RMAppAttemptState#FAILED}
+ */
+ private void testAppAttemptFailedState(Container container,
+ String diagnostics) {
+ assertEquals(RMAppAttemptState.FAILED,
+ applicationAttempt.getAppAttemptState());
+ assertEquals(diagnostics, applicationAttempt.getDiagnostics());
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(container, applicationAttempt.getMasterContainer());
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
+ assertEquals(0, applicationAttempt.getRanNodes().size());
+
+ // Check events
+ verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
+ }
+
+ private void submitApplicationAttempt() {
+ ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+ applicationAttempt.handle(
+ new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+ testAppAttemptSubmittedState();
+ }
+
+ private void scheduleApplicationAttempt() {
+ submitApplicationAttempt();
+ applicationAttempt.handle(
+ new RMAppAttemptEvent(
+ applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.APP_ACCEPTED));
+ testAppAttemptScheduledState();
+ }
+
+ private Container allocateApplicationAttempt() {
+ scheduleApplicationAttempt();
+
+ // Mock the allocation of AM container
+ Container container = mock(Container.class);
+ Allocation allocation = mock(Allocation.class);
+ when(allocation.getContainers()).
+ thenReturn(Collections.singletonList(container));
+ when(
+ scheduler.allocate(
+ any(ApplicationAttemptId.class),
+ any(List.class),
+ any(List.class))).
+ thenReturn(allocation);
+
+ applicationAttempt.handle(
+ new RMAppAttemptContainerAllocatedEvent(
+ applicationAttempt.getAppAttemptId(),
+ container));
+
+ testAppAttemptAllocatedState(container);
+
+ return container;
+ }
+
+ @Test
+ public void testNewToKilled() {
+ applicationAttempt.handle(
+ new RMAppAttemptEvent(
+ applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.KILL));
+ testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
+ }
+
+ @Test
+ public void testSubmittedToFailed() {
+ submitApplicationAttempt();
+ String message = "Rejected";
+ applicationAttempt.handle(
+ new RMAppAttemptRejectedEvent(
+ applicationAttempt.getAppAttemptId(), message));
+ testAppAttemptSubmittedToFailedState(message);
+ }
+
+ @Test
+ public void testSubmittedToKilled() {
+ submitApplicationAttempt();
+ applicationAttempt.handle(
+ new RMAppAttemptEvent(
+ applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.KILL));
+ testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
+ }
+
+ @Test
+ public void testScheduledToKilled() {
+ scheduleApplicationAttempt();
+ applicationAttempt.handle(
+ new RMAppAttemptEvent(
+ applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.KILL));
+ testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
+ }
+
+ @Test
+ public void testAllocatedToKilled() {
+ Container amContainer = allocateApplicationAttempt();
+ applicationAttempt.handle(
+ new RMAppAttemptEvent(
+ applicationAttempt.getAppAttemptId(),
+ RMAppAttemptEventType.KILL));
+ testAppAttemptKilledState(amContainer, EMPTY_DIAGNOSTICS);
+ }
+
+ @Test
+ public void testAllocatedToFailed() {
+ Container amContainer = allocateApplicationAttempt();
+ String diagnostics = "Launch Failed";
+ applicationAttempt.handle(
+ new RMAppAttemptLaunchFailedEvent(
+ applicationAttempt.getAppAttemptId(),
+ diagnostics));
+ testAppAttemptFailedState(amContainer, diagnostics);
+ }
+
+}