You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2012/02/14 23:55:01 UTC
svn commit: r1244256 - in
/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-...
Author: mahadev
Date: Tue Feb 14 22:55:00 2012
New Revision: 1244256
URL: http://svn.apache.org/viewvc?rev=1244256&view=rev
Log:
MAPREDUCE-3858. Task attempt failure during commit results in task never completing. (Tom White via mahadev) - Merging r1244254 from trunk.
Modified:
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt?rev=1244256&r1=1244255&r2=1244256&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt Tue Feb 14 22:55:00 2012
@@ -747,6 +747,9 @@ Release 0.23.1 - 2012-02-08
MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
still can recover successfully after MAPREDUCE-3846. (vinodkv)
+ MAPREDUCE-3858. Task attempt failure during commit results in task never completing.
+ (Tom White via mahadev)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1244256&r1=1244255&r2=1244256&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Feb 14 22:55:00 2012
@@ -832,6 +832,9 @@ public abstract class TaskImpl implement
public TaskState transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+ if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+ task.commitAttempt = null;
+ }
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
if (attempt.getAssignedContainerMgrAddress() != null) {
//container was assigned
@@ -877,6 +880,7 @@ public abstract class TaskImpl implement
protected void unSucceed(TaskImpl task) {
++task.numberUncompletedAttempts;
+ task.commitAttempt = null;
task.successfulAttempt = null;
}
}
Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1244256&r1=1244255&r2=1244256&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Feb 14 22:55:00 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -261,6 +263,12 @@ public class TestTaskImpl {
assertTaskRunningState();
}
+ private void commitTaskAttempt(TaskAttemptId attemptId) {
+ mockTask.handle(new TaskTAttemptEvent(attemptId,
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+ assertTaskRunningState();
+ }
+
private MockTaskAttemptImpl getLastAttempt() {
return taskAttempts.get(taskAttempts.size()-1);
}
@@ -279,32 +287,45 @@ public class TestTaskImpl {
assertTaskRunningState();
}
+ private void failRunningTaskAttempt(TaskAttemptId attemptId) {
+ mockTask.handle(new TaskTAttemptEvent(attemptId,
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertTaskRunningState();
+ }
+
/**
* {@link TaskState#NEW}
*/
private void assertTaskNewState() {
- assertEquals(mockTask.getState(), TaskState.NEW);
+ assertEquals(TaskState.NEW, mockTask.getState());
}
/**
* {@link TaskState#SCHEDULED}
*/
private void assertTaskScheduledState() {
- assertEquals(mockTask.getState(), TaskState.SCHEDULED);
+ assertEquals(TaskState.SCHEDULED, mockTask.getState());
}
/**
* {@link TaskState#RUNNING}
*/
private void assertTaskRunningState() {
- assertEquals(mockTask.getState(), TaskState.RUNNING);
+ assertEquals(TaskState.RUNNING, mockTask.getState());
}
/**
* {@link TaskState#KILL_WAIT}
*/
private void assertTaskKillWaitState() {
- assertEquals(mockTask.getState(), TaskState.KILL_WAIT);
+ assertEquals(TaskState.KILL_WAIT, mockTask.getState());
+ }
+
+ /**
+ * {@link TaskState#SUCCEEDED}
+ */
+ private void assertTaskSucceededState() {
+ assertEquals(TaskState.SUCCEEDED, mockTask.getState());
}
@Test
@@ -409,5 +430,32 @@ public class TestTaskImpl {
assert(mockTask.getProgress() == progress);
}
+
+ @Test
+ public void testFailureDuringTaskAttemptCommit() {
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
+ commitTaskAttempt(getLastAttempt().getAttemptId());
+
+ // During the task attempt commit there is an exception which causes
+ // the attempt to fail
+ updateLastAttemptState(TaskAttemptState.FAILED);
+ failRunningTaskAttempt(getLastAttempt().getAttemptId());
+
+ assertEquals(2, taskAttempts.size());
+ updateLastAttemptState(TaskAttemptState.SUCCEEDED);
+ commitTaskAttempt(getLastAttempt().getAttemptId());
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ assertFalse("First attempt should not commit",
+ mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
+ assertTrue("Second attempt should commit",
+ mockTask.canCommit(getLastAttempt().getAttemptId()));
+
+ assertTaskSucceededState();
+ }
}