You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/02/02 00:33:59 UTC
svn commit: r1239402 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java...
Author: sseth
Date: Wed Feb 1 23:33:58 2012
New Revision: 1239402
URL: http://svn.apache.org/viewvc?rev=1239402&view=rev
Log:
MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files. (Contributed by Arun C Murthy)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Feb 1 23:33:58 2012
@@ -648,6 +648,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3427. Fix streaming unit tests broken after mavenization.
(Hitesh Shah via acmurthy)
+ MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files.
+ (Arun C Murthy via sseth)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Feb 1 23:33:58 2012
@@ -191,6 +191,11 @@ public class RecoveryService extends Com
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ if (parseException != null) {
+ LOG.info("Got an error parsing job-history file " + historyFile +
+ ", ignoring incomplete events.", parseException);
+ }
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Wed Feb 1 23:33:58 2012
@@ -24,8 +24,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,9 +55,13 @@ import org.apache.hadoop.yarn.api.record
@InterfaceStability.Unstable
public class JobHistoryParser {
+ private static final Log LOG = LogFactory.getLog(JobHistoryParser.class);
+
private final FSDataInputStream in;
- JobInfo info = null;
+ private JobInfo info = null;
+ private IOException parseException = null;
+
/**
* Create a job history parser for the given history file using the
* given file system
@@ -91,30 +98,58 @@ public class JobHistoryParser {
* The first invocation will populate the object, subsequent calls
* will return the already parsed object.
* The input stream is closed on return
+ *
+ * This api ignores partial records and stops parsing on encountering one.
+ * {@link #getParseException()} can be used to fetch the exception, if any.
+ *
* @return The populated jobInfo object
* @throws IOException
+ * @see #getParseException()
*/
public synchronized JobInfo parse() throws IOException {
+ return parse(new EventReader(in));
+ }
+
+ /**
+ * Only used for unit tests.
+ */
+ @Private
+ public synchronized JobInfo parse(EventReader reader) throws IOException {
if (info != null) {
return info;
}
- EventReader reader = new EventReader(in);
+ info = new JobInfo();
+ int eventCtr = 0;
HistoryEvent event;
- info = new JobInfo();
try {
while ((event = reader.getNextEvent()) != null) {
handleEvent(event);
- }
+ ++eventCtr;
+ }
+ } catch (IOException ioe) {
+ LOG.info("Caught exception parsing history file after " + eventCtr +
+ " events", ioe);
+ parseException = ioe;
} finally {
in.close();
}
return info;
}
- private void handleEvent(HistoryEvent event) throws IOException {
+ /**
+ * Get the parse exception, if any.
+ *
+ * @return the parse exception, if any
+ * @see #parse()
+ */
+ public synchronized IOException getParseException() {
+ return parseException;
+ }
+
+ private void handleEvent(HistoryEvent event) {
EventType type = event.getEventType();
switch (type) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Wed Feb 1 23:33:58 2012
@@ -249,8 +249,9 @@ public class CompletedJob implements org
}
if (historyFileAbsolute != null) {
+ JobHistoryParser parser = null;
try {
- JobHistoryParser parser =
+ parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute);
jobInfo = parser.parse();
@@ -258,6 +259,12 @@ public class CompletedJob implements org
throw new YarnException("Could not load history file "
+ historyFileAbsolute, e);
}
+ IOException parseException = parser.getParseException();
+ if (parseException != null) {
+ throw new YarnException(
+ "Could not parse history file " + historyFileAbsolute,
+ parseException);
+ }
} else {
throw new IOException("History file not found");
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1239402&r1=1239401&r2=1239402&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Wed Feb 1 23:33:58 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -37,14 +38,18 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -61,6 +66,9 @@ import org.apache.hadoop.yarn.service.Se
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
@@ -76,6 +84,17 @@ public class TestJobHistoryParsing {
@Test
public void testHistoryParsing() throws Exception {
+ checkHistoryParsing(2, 1, 2);
+ }
+
+ @Test
+ public void testHistoryParsingWithParseErrors() throws Exception {
+ checkHistoryParsing(3, 0, 2);
+ }
+
+ private void checkHistoryParsing(final int numMaps, final int numReduces,
+ final int numSuccessfulMaps)
+ throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
long amStartTimeEst = System.currentTimeMillis();
@@ -83,8 +102,9 @@ public class TestJobHistoryParsing {
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
- MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
- true);
+ MRApp app =
+ new MRAppWithHistory(numMaps, numReduces, true,
+ this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
@@ -117,8 +137,42 @@ public class TestJobHistoryParsing {
}
JobHistoryParser parser = new JobHistoryParser(in);
- JobInfo jobInfo = parser.parse();
-
+ final EventReader realReader = new EventReader(in);
+ EventReader reader = Mockito.mock(EventReader.class);
+ if (numMaps == numSuccessfulMaps) {
+ reader = realReader;
+ } else {
+ final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
+ Mockito.when(reader.getNextEvent()).thenAnswer(
+ new Answer<HistoryEvent>() {
+ public HistoryEvent answer(InvocationOnMock invocation)
+ throws IOException {
+ HistoryEvent event = realReader.getNextEvent();
+ if (event instanceof TaskFinishedEvent) {
+ numFinishedEvents.incrementAndGet();
+ }
+
+ if (numFinishedEvents.get() <= numSuccessfulMaps) {
+ return event;
+ } else {
+ throw new IOException("test");
+ }
+ }
+ }
+ );
+ }
+
+ JobInfo jobInfo = parser.parse(reader);
+
+ long numFinishedMaps =
+ computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
+
+ if (numFinishedMaps != numMaps) {
+ Exception parseException = parser.getParseException();
+ Assert.assertNotNull("Didn't get expected parse exception",
+ parseException);
+ }
+
Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
jobInfo.getUsername());
Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
@@ -126,14 +180,16 @@ public class TestJobHistoryParsing {
jobInfo.getJobQueueName());
Assert
.assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
- Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps());
- Assert.assertEquals("incorrect finishedReduces ", 1,
+ Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps,
+ numFinishedMaps);
+ Assert.assertEquals("incorrect finishedReduces ", numReduces,
jobInfo.getFinishedReduces());
Assert.assertEquals("incorrect uberized ", job.isUber(),
jobInfo.getUberized());
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
int totalTasks = allTasks.size();
- Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
+ Assert.assertEquals("total number of tasks is incorrect ",
+ (numMaps+numReduces), totalTasks);
// Verify aminfo
Assert.assertEquals(1, jobInfo.getAMInfos().size());
@@ -172,55 +228,78 @@ public class TestJobHistoryParsing {
Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
Assert.assertEquals("Incorrect shuffle port for task attempt",
taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
- Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
- Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
-
- // Verify rack-name
- Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
- .getRackname(), RACK_NAME);
+ if (numMaps == numSuccessfulMaps) {
+ Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
+ Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
+
+ // Verify rack-name
+ Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
+ .getRackname(), RACK_NAME);
+ }
}
}
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobId);
- Path summaryFile = new Path(jobhistoryDir, summaryFileName);
- String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
- Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
- Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
- Assert.assertNotNull(jobSummaryString);
-
- Map<String, String> jobSummaryElements = new HashMap<String, String>();
- StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
- while (strToken.hasMoreTokens()) {
- String keypair = strToken.nextToken();
- jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
+ if (numMaps == numSuccessfulMaps) {
- }
+ String summaryFileName = JobHistoryUtils
+ .getIntermediateSummaryFileName(jobId);
+ Path summaryFile = new Path(jobhistoryDir, summaryFileName);
+ String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+ Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+ Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
+ Assert.assertNotNull(jobSummaryString);
+
+ Map<String, String> jobSummaryElements = new HashMap<String, String>();
+ StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
+ while (strToken.hasMoreTokens()) {
+ String keypair = strToken.nextToken();
+ jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
- Assert.assertEquals("JobId does not match", jobId.toString(),
- jobSummaryElements.get("jobId"));
- Assert.assertTrue("submitTime should not be 0",
- Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
- Assert.assertTrue("launchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
- Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
- Assert
- .assertTrue(
- "firstReduceTaskLaunchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
- Assert.assertTrue("finishTime should not be 0",
- Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
- Assert.assertEquals("Mismatch in num map slots", 2,
- Integer.parseInt(jobSummaryElements.get("numMaps")));
- Assert.assertEquals("Mismatch in num reduce slots", 1,
- Integer.parseInt(jobSummaryElements.get("numReduces")));
- Assert.assertEquals("User does not match", System.getProperty("user.name"),
- jobSummaryElements.get("user"));
- Assert.assertEquals("Queue does not match", "default",
- jobSummaryElements.get("queue"));
- Assert.assertEquals("Status does not match", "SUCCEEDED",
- jobSummaryElements.get("status"));
+ }
+
+ Assert.assertEquals("JobId does not match", jobId.toString(),
+ jobSummaryElements.get("jobId"));
+ Assert.assertTrue("submitTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
+ Assert.assertTrue("launchTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
+ Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+ Assert
+ .assertTrue(
+ "firstReduceTaskLaunchTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+ Assert.assertTrue("finishTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
+ Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
+ Integer.parseInt(jobSummaryElements.get("numMaps")));
+ Assert.assertEquals("Mismatch in num reduce slots", numReduces,
+ Integer.parseInt(jobSummaryElements.get("numReduces")));
+ Assert.assertEquals("User does not match", System.getProperty("user.name"),
+ jobSummaryElements.get("user"));
+ Assert.assertEquals("Queue does not match", "default",
+ jobSummaryElements.get("queue"));
+ Assert.assertEquals("Status does not match", "SUCCEEDED",
+ jobSummaryElements.get("status"));
+ }
+ }
+
+ // Computes finished maps similar to RecoveryService...
+ private long computeFinishedMaps(JobInfo jobInfo,
+ int numMaps, int numSuccessfulMaps) {
+ if (numMaps == numSuccessfulMaps) {
+ return jobInfo.getFinishedMaps();
+ }
+
+ long numFinishedMaps = 0;
+ Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos =
+ jobInfo.getAllTasks();
+ for (TaskInfo taskInfo : taskInfos.values()) {
+ if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+ ++numFinishedMaps;
+ }
+ }
+ return numFinishedMaps;
}
@Test
@@ -264,6 +343,9 @@ public class TestJobHistoryParsing {
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ Assert.assertNull("Caught an expected exception " + parseException,
+ parseException);
int noOffailedAttempts = 0;
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
for (Task task : job.getTasks().values()) {