You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/04/13 15:46:46 UTC
svn commit: r1325767 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: bobby
Date: Fri Apr 13 13:46:45 2012
New Revision: 1325767
URL: http://svn.apache.org/viewvc?rev=1325767&view=rev
Log:
svn merge -c 1325765 from trunk. FIXES: MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to also be completed. (Bikas Saha via bobby)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1325767&r1=1325766&r2=1325767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Apr 13 13:46:45 2012
@@ -231,6 +231,9 @@ Release 0.23.3 - UNRELEASED
text on the UI to N/A instead of a link to null. (Bhallamudi Venkata Siva
Kamesh via sseth)
+ MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to
+ also be completed. (Bikas Saha via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1325767&r1=1325766&r2=1325767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Apr 13 13:46:45 2012
@@ -656,6 +656,7 @@ public abstract class TaskImpl implement
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+ TypeConverter.fromYarn(task.successfulAttempt),
task.getFinishTime(task.successfulAttempt),
TypeConverter.fromYarn(task.taskId.getTaskType()),
taskState.toString(),
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1325767&r1=1325766&r2=1325767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Fri Apr 13 13:46:45 2012
@@ -93,7 +93,7 @@ public class TestJobHistoryEventHandler
// First completion event, but min-queue-size for batching flushes is 10
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, 0, TaskType.MAP, "", null)));
+ t.taskID, null, 0, TaskType.MAP, "", null)));
verify(mockWriter).flush();
} finally {
@@ -129,7 +129,7 @@ public class TestJobHistoryEventHandler
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, 0, TaskType.MAP, "", null)));
+ t.taskID, null, 0, TaskType.MAP, "", null)));
}
handleNextNEvents(jheh, 9);
@@ -174,7 +174,7 @@ public class TestJobHistoryEventHandler
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, 0, TaskType.MAP, "", null)));
+ t.taskID, null, 0, TaskType.MAP, "", null)));
}
handleNextNEvents(jheh, 9);
@@ -215,7 +215,7 @@ public class TestJobHistoryEventHandler
for (int i = 0 ; i < 100 ; i++) {
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
- t.taskID, 0, TaskType.MAP, "", null)));
+ t.taskID, null, 0, TaskType.MAP, "", null)));
}
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1325767&r1=1325766&r2=1325767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Fri Apr 13 13:46:45 2012
@@ -25,6 +25,8 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
@@ -37,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
public class TestFetchFailure {
@@ -142,6 +145,107 @@ public class TestFetchFailure {
Assert.assertEquals("Event status not correct for reduce attempt1",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
}
+
+ /**
+ * This tests that if a map attempt was failed (say due to fetch failures),
+ * then it gets re-run. When the next map attempt is running, if the AM dies,
+ * then, on AM re-run, the AM does not incorrectly remember the first failed
+ * attempt. Currently recovery does not recover running tasks. Effectively,
+ * the AM re-runs the maps from scratch.
+ */
+ @Test
+ public void testFetchFailureWithRecovery() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount);
+ Configuration conf = new Configuration();
+ // map -> reduce -> fetch-failure -> map retry is incompatible with
+ // sequential, single-task-attempt approach in uber-AM, so disable:
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("Num tasks not correct",
+ 2, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask = it.next();
+ Task reduceTask = it.next();
+
+ //wait for Task state move to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+ TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ TaskAttemptCompletionEvent[] events =
+ job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("Num completion events not correct",
+ 1, events.length);
+ Assert.assertEquals("Event status not correct",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
+
+ // wait for reduce to start running
+ app.waitForState(reduceTask, TaskState.RUNNING);
+ TaskAttempt reduceAttempt =
+ reduceTask.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+ //send 3 fetch failures from reduce to trigger map re execution
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+
+ //wait for map Task state move back to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+
+ // Crash the app again.
+ app.stop();
+
+ //rerun
+ app =
+ new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
+ ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("Num tasks not correct",
+ 2, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask = it.next();
+ reduceTask = it.next();
+
+ // the map is not in a SUCCEEDED state after restart of AM
+ app.waitForState(mapTask, TaskState.RUNNING);
+ mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ events = job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("Num completion events not correct", 2, events.length);
+ }
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
TaskAttempt mapAttempt) {
@@ -150,4 +254,20 @@ public class TestFetchFailure {
reduceAttempt.getID(),
Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
}
+
+ static class MRAppWithHistory extends MRApp {
+ public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+ String testName, boolean cleanOnStart, int startCount) {
+ super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+ }
+
+ @Override
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
+ getStartCount());
+ return eventHandler;
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1325767&r1=1325766&r2=1325767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Fri Apr 13 13:46:45 2012
@@ -230,7 +230,8 @@
{"name": "taskType", "type": "string"},
{"name": "finishTime", "type": "long"},
{"name": "status", "type": "string"},
- {"name": "counters", "type": "JhCounters"}
+ {"name": "counters", "type": "JhCounters"},
+ {"name": "successfulAttemptId", "type": "string"}
]
},
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1325767&r1=1325766&r2=1325767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Apr 13 13:46:45 2012
@@ -276,6 +276,17 @@ public class JobHistoryParser {
attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime();
+ if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
+ {
+ //this is a successful task
+ if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
+ {
+ // the failed attempt is the one that made this task successful
+ // so its no longer successful
+ taskInfo.status = null;
+ // not resetting the other fields set in handleTaskFinishedEvent()
+ }
+ }
}
private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
@@ -299,6 +310,7 @@ public class JobHistoryParser {
taskInfo.counters = event.getCounters();
taskInfo.finishTime = event.getFinishTime();
taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
+ taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId();
}
private void handleTaskUpdatedEvent(TaskUpdatedEvent event) {
@@ -514,6 +526,7 @@ public class JobHistoryParser {
String status;
String error;
TaskAttemptID failedDueToAttemptId;
+ TaskAttemptID successfulAttemptId;
Map<TaskAttemptID, TaskAttemptInfo> attemptsMap;
public TaskInfo() {
@@ -554,6 +567,10 @@ public class JobHistoryParser {
public TaskAttemptID getFailedDueToAttemptId() {
return failedDueToAttemptId;
}
+ /** @return the attempt Id that caused this task to succeed */
+ public TaskAttemptID getSuccessfulAttemptId() {
+ return successfulAttemptId;
+ }
/** @return the error */
public String getError() { return error; }
/** @return the map of all attempts for this task */
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java?rev=1325767&r1=1325766&r2=1325767&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java Fri Apr 13 13:46:45 2012
@@ -22,6 +22,7 @@ import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -36,6 +37,7 @@ public class TaskFinishedEvent implement
private TaskFinished datum = null;
private TaskID taskid;
+ private TaskAttemptID successfulAttemptId;
private long finishTime;
private TaskType taskType;
private String status;
@@ -44,15 +46,17 @@ public class TaskFinishedEvent implement
/**
* Create an event to record the successful completion of a task
* @param id Task ID
+ * @param attemptId Task Attempt ID of the successful attempt for this task
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param status Status string
* @param counters Counters for the task
*/
- public TaskFinishedEvent(TaskID id, long finishTime,
+ public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType,
String status, Counters counters) {
this.taskid = id;
+ this.successfulAttemptId = attemptId;
this.finishTime = finishTime;
this.taskType = taskType;
this.status = status;
@@ -65,6 +69,10 @@ public class TaskFinishedEvent implement
if (datum == null) {
datum = new TaskFinished();
datum.taskid = new Utf8(taskid.toString());
+ if(successfulAttemptId != null)
+ {
+ datum.successfulAttemptId = new Utf8(successfulAttemptId.toString());
+ }
datum.finishTime = finishTime;
datum.counters = EventWriter.toAvro(counters);
datum.taskType = new Utf8(taskType.name());
@@ -76,6 +84,10 @@ public class TaskFinishedEvent implement
public void setDatum(Object oDatum) {
this.datum = (TaskFinished)oDatum;
this.taskid = TaskID.forName(datum.taskid.toString());
+ if (datum.successfulAttemptId != null) {
+ this.successfulAttemptId = TaskAttemptID
+ .forName(datum.successfulAttemptId.toString());
+ }
this.finishTime = datum.finishTime;
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.status = datum.status.toString();
@@ -84,6 +96,14 @@ public class TaskFinishedEvent implement
/** Get task id */
public TaskID getTaskId() { return TaskID.forName(taskid.toString()); }
+ /** Get successful task attempt id */
+ public TaskAttemptID getSuccessfulTaskAttemptId() {
+ if(successfulAttemptId != null)
+ {
+ return TaskAttemptID.forName(successfulAttemptId.toString());
+ }
+ return null;
+ }
/** Get the task finish time */
public long getFinishTime() { return finishTime; }
/** Get task counters */