You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/04/11 20:18:40 UTC
svn commit: r1324902 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...
Author: sseth
Date: Wed Apr 11 18:18:40 2012
New Revision: 1324902
URL: http://svn.apache.org/viewvc?rev=1324902&view=rev
Log:
MAPREDUCE-3932. Fix the TaskAttempt state machine to handle CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional states. (Contributed by Robert Joseph Evans)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1324902&r1=1324901&r2=1324902&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 18:18:40 2012
@@ -313,6 +313,10 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory
after the history service is stopped. (Jason Lowe via sseth)
+ MAPREDUCE-3932. Fix the TaskAttempt state machine to handle
+ CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional
+ states. (Robert Joseph Evans via sseth)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1324902&r1=1324901&r2=1324902&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Wed Apr 11 18:18:40 2012
@@ -347,6 +347,8 @@ public class JobImpl implements org.apac
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.INTERNAL_ERROR))
+ .addTransition(JobState.ERROR, JobState.ERROR,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables
.installTopology();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1324902&r1=1324901&r2=1324902&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed Apr 11 18:18:40 2012
@@ -316,7 +316,9 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
+ // Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_TIMED_OUT))
@@ -338,6 +340,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_TIMED_OUT))
@@ -359,7 +362,10 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
- TaskAttemptEventType.TA_FAILMSG))
+ TaskAttemptEventType.TA_FAILMSG,
+ // Container launch events can arrive late
+ TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from KILL_TASK_CLEANUP
.addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
@@ -377,7 +383,10 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
- TaskAttemptEventType.TA_FAILMSG))
+ TaskAttemptEventType.TA_FAILMSG,
+ // Container launch events can arrive late
+ TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from SUCCEEDED
.addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
@@ -405,7 +414,9 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
+ // Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
@@ -420,7 +431,9 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
+ // Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG))
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1324902&r1=1324901&r2=1324902&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Wed Apr 11 18:18:40 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -68,6 +69,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -81,9 +85,12 @@ import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -91,12 +98,16 @@ import org.apache.hadoop.yarn.util.Build
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "rawtypes"})
public class TestTaskAttempt{
-
- @SuppressWarnings("rawtypes")
@Test
public void testAttemptContainerRequest() throws Exception {
+ //WARNING: This test must run first. This is because there is an
+ // optimization where the credentials passed in are cached statically so
+ // they do not need to be recomputed when creating a new
+ // ContainerLaunchContext. if other tests run first this code will cache
+ // their credentials and this test will fail trying to look for the
+ // credentials it inserted in.
final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
final byte[] SECRET_KEY = ("secretkey").getBytes();
Map<ApplicationAccessType, String> acls =
@@ -125,7 +136,7 @@ public class TestTaskAttempt{
Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
("tokenid").getBytes(), ("tokenpw").getBytes(),
new Text("tokenkind"), new Text("tokenservice"));
-
+
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
mock(TaskSplitMetaInfo.class), jobConf, taListener,
@@ -134,7 +145,7 @@ public class TestTaskAttempt{
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
-
+
ContainerLaunchContext launchCtx =
TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
jobConf, jobToken, taImpl.createRemoteTask(),
@@ -185,7 +196,6 @@ public class TestTaskAttempt{
testMRAppHistory(app);
}
- @SuppressWarnings("rawtypes")
@Test
public void testSingleRackRequest() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
@@ -213,11 +223,10 @@ public class TestTaskAttempt{
ContainerRequestEvent cre =
(ContainerRequestEvent) arg.getAllValues().get(1);
String[] requestedRacks = cre.getRacks();
- //Only a single occurance of /DefaultRack
+ //Only a single occurrence of /DefaultRack
assertEquals(1, requestedRacks.length);
}
- @SuppressWarnings("rawtypes")
@Test
public void testHostResolveAttempt() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
@@ -316,14 +325,12 @@ public class TestTaskAttempt{
.getValue());
}
- @SuppressWarnings("rawtypes")
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = new SystemClock();
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
}
- @SuppressWarnings("rawtypes")
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@@ -394,4 +401,67 @@ public class TestTaskAttempt{
};
}
}
+
+ @Test
+ public void testLaunchFailedWhileKilling() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), null);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+ assertFalse(eventHandler.internalError);
+ }
+
+ public static class MockEventHandler implements EventHandler {
+ public boolean internalError;
+
+ @Override
+ public void handle(Event event) {
+ if (event instanceof JobEvent) {
+ JobEvent je = ((JobEvent) event);
+ if (JobEventType.INTERNAL_ERROR == je.getType()) {
+ internalError = true;
+ }
+ }
+ }
+
+ };
}