You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/12/22 02:46:06 UTC
svn commit: r1425224 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: jlowe
Date: Sat Dec 22 01:46:05 2012
New Revision: 1425224
URL: http://svn.apache.org/viewvc?rev=1425224&view=rev
Log:
svn merge -c 1425223 FIXES: MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while speculating. Contributed by Jason Lowe
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1425224&r1=1425223&r2=1425224&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Sat Dec 22 01:46:05 2012
@@ -484,6 +484,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4793. Problem with adding resources when using both -files and
-file to hadoop streaming (jlowe)
+ MAPREDUCE-4890. Invalid TaskImpl state transitions when task fails while
+ speculating (jlowe)
+
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1425224&r1=1425223&r2=1425224&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Dec 22 01:46:05 2012
@@ -231,7 +231,12 @@ public abstract class TaskImpl implement
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(TaskEventType.T_KILL,
- TaskEventType.T_ADD_SPEC_ATTEMPT))
+ TaskEventType.T_ADD_SPEC_ATTEMPT,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+ TaskEventType.T_ATTEMPT_FAILED,
+ TaskEventType.T_ATTEMPT_KILLED,
+ TaskEventType.T_ATTEMPT_LAUNCHED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED))
// Transitions from KILLED state
.addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
@@ -941,6 +946,13 @@ public abstract class TaskImpl implement
task.handleTaskAttemptCompletion(
taskAttemptId,
TaskAttemptCompletionEventStatus.TIPFAILED);
+
+ // issue kill to all non finished attempts
+ for (TaskAttempt taskAttempt : task.attempts.values()) {
+ task.killUnfinishedAttempt
+ (taskAttempt, "Task has failed. Killing attempt!");
+ }
+ task.inProgressAttempts.clear();
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1425224&r1=1425223&r2=1425224&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Sat Dec 22 01:46:05 2012
@@ -602,4 +602,73 @@ public class TestTaskImpl {
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
+
+ @Test
+ public void testFailedTransitions() {
+ mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+ remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+ credentials, clock,
+ completedTasksFromPreviousRun, startCount,
+ metrics, appContext, TaskType.MAP) {
+ @Override
+ protected int getMaxAttempts() {
+ return 1;
+ }
+ };
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+
+ // add three more speculative attempts
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ assertEquals(4, taskAttempts.size());
+
+ // have the first attempt fail, verify task failed due to no retries
+ MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
+ taskAttempt.setState(TaskAttemptState.FAILED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+
+ // verify task can no longer be killed
+ mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+
+ // verify speculative doesn't launch new tasks
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(4, taskAttempts.size());
+
+ // verify attempt events from active tasks don't knock task out of FAILED
+ taskAttempt = taskAttempts.get(1);
+ taskAttempt.setState(TaskAttemptState.COMMIT_PENDING);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ taskAttempt.setState(TaskAttemptState.FAILED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ taskAttempt = taskAttempts.get(2);
+ taskAttempt.setState(TaskAttemptState.SUCCEEDED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ taskAttempt = taskAttempts.get(3);
+ taskAttempt.setState(TaskAttemptState.KILLED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ }
}