You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/11/02 00:07:56 UTC
svn commit: r1404825 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Author: jlowe
Date: Thu Nov 1 23:07:55 2012
New Revision: 1404825
URL: http://svn.apache.org/viewvc?rev=1404825&view=rev
Log:
svn merge -c 1404817 FIXES: MAPREDUCE-4729. job history UI not showing all job attempts. Contributed by Vinod Kumar Vavilapalli
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
- copied unchanged from r1404817, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1404825&r1=1404824&r2=1404825&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Nov 1 23:07:55 2012
@@ -65,6 +65,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4746. The MR Application Master does not have a config to set
environment variables (Rob Parker via bobby)
+ MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod
+ Kumar Vavilapalli via jlowe)
+
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1404825&r1=1404824&r2=1404825&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Nov 1 23:07:55 2012
@@ -23,14 +23,17 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -89,6 +95,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
@@ -826,16 +833,21 @@ public class MRAppMaster extends Composi
@Override
public void start() {
+ amInfos = new LinkedList<AMInfo>();
+
// Pull completedTasks etc from recovery
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
+ } else {
+ // Get the amInfos anyways irrespective of whether recovery is enabled or
+ // not IF this is not the first AM generation
+ if (appAttemptID.getAttemptId() != 1) {
+ amInfos.addAll(readJustAMInfos());
+ }
}
- // / Create the AMInfo for the current AppMaster
- if (amInfos == null) {
- amInfos = new LinkedList<AMInfo>();
- }
+ // Current an AMInfo for the current AM generation.
AMInfo amInfo =
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort);
@@ -893,6 +905,51 @@ public class MRAppMaster extends Composi
startJobs();
}
+ private List<AMInfo> readJustAMInfos() {
+ List<AMInfo> amInfos = new ArrayList<AMInfo>();
+ FSDataInputStream inputStream = null;
+ try {
+ inputStream =
+ RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
+ appAttemptID);
+ EventReader jobHistoryEventReader = new EventReader(inputStream);
+
+ // All AMInfos are contiguous. Track when the first AMStartedEvent
+ // appears.
+ boolean amStartedEventsBegan = false;
+
+ HistoryEvent event;
+ while ((event = jobHistoryEventReader.getNextEvent()) != null) {
+ if (event.getEventType() == EventType.AM_STARTED) {
+ if (!amStartedEventsBegan) {
+ // First AMStartedEvent.
+ amStartedEventsBegan = true;
+ }
+ AMStartedEvent amStartedEvent = (AMStartedEvent) event;
+ amInfos.add(MRBuilderUtils.newAMInfo(
+ amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
+ amStartedEvent.getContainerId(),
+ StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
+ amStartedEvent.getNodeManagerPort(),
+ amStartedEvent.getNodeManagerHttpPort()));
+ } else if (amStartedEventsBegan) {
+ // This means AMStartedEvents began and this event is a
+ // non-AMStarted event.
+ // No need to continue reading all the other events.
+ break;
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not parse the old history file. "
+ + "Will not have old AMinfos ", e);
+ } finally {
+ if (inputStream != null) {
+ IOUtils.closeQuietly(inputStream);
+ }
+ }
+ return amInfos;
+ }
+
/**
* This can be overridden to instantiate multiple jobs and create a
* workflow.
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1404825&r1=1404824&r2=1404825&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Nov 1 23:07:55 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -178,26 +177,13 @@ public class RecoveryService extends Com
}
private void parse() throws IOException {
- // TODO: parse history file based on startCount
- String jobName =
- TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
- String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
- FSDataInputStream in = null;
- Path historyFile = null;
- Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
- new Path(jobhistoryDir));
- FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
- getConfig());
- //read the previous history file
- historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
- histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));
- LOG.info("History file is at " + historyFile);
- in = fc.open(historyFile);
+ FSDataInputStream in =
+ getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
- LOG.info("Got an error parsing job-history file " + historyFile +
+ LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
@@ -213,6 +199,28 @@ public class RecoveryService extends Com
LOG.info("Read completed tasks from history "
+ completedTasks.size());
}
+
+ public static FSDataInputStream getPreviousJobHistoryFileStream(
+ Configuration conf, ApplicationAttemptId applicationAttemptId)
+ throws IOException {
+ FSDataInputStream in = null;
+ Path historyFile = null;
+ String jobName =
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+ .toString();
+ String jobhistoryDir =
+ JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+ Path histDirPath =
+ FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+ FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+ // read the previous history file
+ historyFile =
+ fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
+ jobName, (applicationAttemptId.getAttemptId() - 1)));
+ LOG.info("History file is at " + historyFile);
+ in = fc.open(historyFile);
+ return in;
+ }
protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher();