You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/02/13 03:11:37 UTC
svn commit: r1445456 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/...
Author: jlowe
Date: Wed Feb 13 02:11:36 2013
New Revision: 1445456
URL: http://svn.apache.org/r1445456
Log:
MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with speculative attempts. Contributed by Robert Parker
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1445456&r1=1445455&r2=1445456&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Feb 13 02:11:36 2013
@@ -712,6 +712,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4458. Warn if java.library.path is used for AM or Task
(Robert Parker via jeagles)
+ MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with
+ speculative attempts (Robert Parker via jlowe)
+
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1445456&r1=1445455&r2=1445456&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Feb 13 02:11:36 2013
@@ -21,9 +21,12 @@ package org.apache.hadoop.mapreduce.v2.a
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -108,7 +112,7 @@ public class RecoveryService extends Com
private JobInfo jobInfo = null;
private final Map<TaskId, TaskInfo> completedTasks =
new HashMap<TaskId, TaskInfo>();
-
+
private final List<TaskEvent> pendingTaskScheduleEvents =
new ArrayList<TaskEvent>();
@@ -193,6 +197,14 @@ public class RecoveryService extends Com
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+ Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
+ taskInfo.getAllTaskAttempts().entrySet().iterator();
+ while (taskAttemptIterator.hasNext()) {
+ Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
+ if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
+ taskAttemptIterator.remove();
+ }
+ }
completedTasks
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
LOG.info("Read from history task "
@@ -215,6 +227,7 @@ public class RecoveryService extends Com
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+ LOG.info("Trying file " + histDirPath.toString());
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1445456&r1=1445455&r2=1445456&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Wed Feb 13 02:11:36 2013
@@ -50,11 +50,15 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@@ -734,12 +738,173 @@ public class TestRecovery {
app.verifyCompleted();
validateOutput();
}
-
+
+ /**
+ * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+ * completely disappears because of failed launch, one attempt gets killed and
+ * one attempt succeeds. AM crashes after the first tasks finishes and
+ * recovers completely and succeeds in the second generation.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSpeculative() throws Exception {
+
+ int runCount = 0;
+ long am1StartTimeEst = System.currentTimeMillis();
+ MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ long jobStartTime = job.getReport().getStartTime();
+ //all maps would be running
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task mapTask2 = it.next();
+ Task reduceTask = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+ app.waitForState(mapTask2, TaskState.RUNNING);
+
+ // Launch a Speculative Task for the first Task
+ app.getContext().getEventHandler().handle(
+ new TaskEvent(mapTask1.getID(), TaskEventType.T_ADD_SPEC_ATTEMPT));
+ int timeOut = 0;
+ while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for next attempt to start");
+ }
+ Iterator<TaskAttempt> t1it = mapTask1.getAttempts().values().iterator();
+ TaskAttempt task1Attempt1 = t1it.next();
+ TaskAttempt task1Attempt2 = t1it.next();
+ TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+
+ ContainerId t1a2contId = task1Attempt2.getAssignedContainerID();
+
+ LOG.info(t1a2contId.toString());
+ LOG.info(task1Attempt1.getID().toString());
+ LOG.info(task1Attempt2.getID().toString());
+
+ // Launch container for speculative attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptContainerLaunchedEvent(task1Attempt2.getID(), runCount));
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+ app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+ // reduces must be in NEW state
+ Assert.assertEquals("Reduce Task state not correct",
+ TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+ //send the done signal to the map 1 attempt 1
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(task1Attempt1, TaskAttemptState.SUCCEEDED);
+
+ //wait for first map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+ long task1StartTime = mapTask1.getReport().getStartTime();
+ long task1FinishTime = mapTask1.getReport().getFinishTime();
+
+ //stop the app
+ app.stop();
+
+ //rerun
+ //in rerun the 1st map will be recovered from previous run
+ long am2StartTimeEst = System.currentTimeMillis();
+ app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ mapTask2 = it.next();
+ reduceTask = it.next();
+
+ // first map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(mapTask2, TaskState.RUNNING);
+
+ task2Attempt = mapTask2.getAttempts().values().iterator().next();
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+ //send the done signal to the 2nd map task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ mapTask2.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait to get it completed
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+ //wait for reduce to be running before sending done
+ app.waitForState(reduceTask, TaskState.RUNNING);
+
+ //send the done signal to the reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduceTask.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ Assert.assertEquals("Job Start time not correct",
+ jobStartTime, job.getReport().getStartTime());
+ Assert.assertEquals("Task Start time not correct",
+ task1StartTime, mapTask1.getReport().getStartTime());
+ Assert.assertEquals("Task Finish time not correct",
+ task1FinishTime, mapTask1.getReport().getFinishTime());
+ Assert.assertEquals(2, job.getAMInfos().size());
+ int attemptNum = 1;
+ // Verify AMInfo
+ for (AMInfo amInfo : job.getAMInfos()) {
+ Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
+ .getAttemptId());
+ Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+ .getApplicationAttemptId());
+ Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+ Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+ Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+ }
+ long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
+ long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
+ Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
+ && am1StartTimeReal <= am2StartTimeEst);
+ Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
+ && am2StartTimeReal <= System.currentTimeMillis());
+
+ }
+
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
-
+
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1445456&r1=1445455&r2=1445456&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Wed Feb 13 02:11:36 2013
@@ -246,6 +246,7 @@ public class JobHistoryParser implements
attemptInfo.state = StringInterner.weakIntern(event.getState());
attemptInfo.counters = event.getCounters();
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
+ info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
}
private void handleReduceAttemptFinishedEvent
@@ -262,6 +263,7 @@ public class JobHistoryParser implements
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
+ info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
}
private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@@ -276,6 +278,7 @@ public class JobHistoryParser implements
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
+ info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
}
private void handleTaskAttemptFailedEvent(
@@ -306,6 +309,7 @@ public class JobHistoryParser implements
taskInfo.successfulAttemptId = null;
}
}
+ info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo);
}
private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
@@ -443,6 +447,7 @@ public class JobHistoryParser implements
Map<JobACL, AccessControlList> jobACLs;
Map<TaskID, TaskInfo> tasksMap;
+ Map<TaskAttemptID, TaskAttemptInfo> completedTaskAttemptsMap;
List<AMInfo> amInfos;
AMInfo latestAmInfo;
boolean uberized;
@@ -456,6 +461,7 @@ public class JobHistoryParser implements
finishedMaps = finishedReduces = 0;
username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
+ completedTaskAttemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>();
jobACLs = new HashMap<JobACL, AccessControlList>();
priority = JobPriority.NORMAL;
}
@@ -530,6 +536,8 @@ public class JobHistoryParser implements
public Counters getReduceCounters() { return reduceCounters; }
/** @return the map of all tasks in this job */
public Map<TaskID, TaskInfo> getAllTasks() { return tasksMap; }
+ /** @return the map of all completed task attempts in this job */
+ public Map<TaskAttemptID, TaskAttemptInfo> getAllCompletedTaskAttempts() { return completedTaskAttemptsMap; }
/** @return the priority of this job */
public String getPriority() { return priority.toString(); }
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }