You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/02 08:03:59 UTC
svn commit: r1098495 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/...
Author: sharad
Date: Mon May 2 06:03:58 2011
New Revision: 1098495
URL: http://svn.apache.org/viewvc?rev=1098495&view=rev
Log:
MAPREDUCE-2462. Write job conf along with JobHistory, other minor improvements. Contributed by Siddharth Seth.
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 2 06:03:58 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+ MAPREDUCE-2462. Write job conf along with JobHistory, other minor improvements.
+ (Siddharth Seth via sharad)
Fix to send finish application event only when the application is finished (mahadev)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Mon May 2 06:03:58 2011
@@ -32,16 +32,17 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -75,36 +76,35 @@ public class JobHistoryEventHandler exte
private static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
- static final FsPermission HISTORY_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0750); // rwxr-x---
-
- public static final FsPermission HISTORY_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0740); // rwxr-----
-
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
this.context = context;
this.startCount = startCount;
}
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.yarn.service.AbstractService#init(org.apache.hadoop.conf.Configuration)
+ * Initializes the FileContext and Path objects for the log and done directories.
+ * Creates these directories if they do not already exist.
+ */
@Override
public void init(Configuration conf) {
- String defaultLogDir = conf.get(
- YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/staging";
- String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
- defaultLogDir);
- String defaultDoneDir = conf.get(
- YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
- String doneDirPrefix =
- conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
- defaultDoneDir);
+
+ String logDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(conf);
+ String doneDirPrefix = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+
try {
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
new Path(doneDirPrefix));
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
if (!doneDirFc.util().exists(doneDirPrefixPath)) {
- doneDirFc.mkdir(doneDirPrefixPath,
- new FsPermission(HISTORY_DIR_PERMISSION), true);
+ try {
+ doneDirFc.mkdir(doneDirPrefixPath, new FsPermission(
+ JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+ } catch (FileAlreadyExistsException e) {
+ LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
+ + "] already exists.");
+ }
}
} catch (IOException e) {
LOG.info("error creating done directory on dfs " + e);
@@ -115,12 +115,16 @@ public class JobHistoryEventHandler exte
new Path(logDir));
logDirFc = FileContext.getFileContext(logDirPath.toUri(), conf);
if (!logDirFc.util().exists(logDirPath)) {
- logDirFc.mkdir(logDirPath,
- new FsPermission(HISTORY_DIR_PERMISSION), true);
+ try {
+ logDirFc.mkdir(logDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION),
+ true);
+ } catch (FileAlreadyExistsException e) {
+ LOG.info("JobHistory Log Directory: [" + doneDirPrefixPath
+ + "] already exists.");
+ }
}
} catch (IOException ioe) {
- LOG.info("Mkdirs failed to create " +
- logDirPath.toString());
+ LOG.info("Mkdirs failed to create " + logDirPath.toString());
throw new YarnException(ioe);
}
super.init(conf);
@@ -182,23 +186,28 @@ public class JobHistoryEventHandler exte
/**
* Create an event writer for the Job represented by the jobID.
+ * Writes out the job configuration to the log directory.
* This should be the first call to history for a job
- * @param jobId
+ *
+ * @param jobId the jobId.
* @throws IOException
*/
protected void setupEventWriter(JobId jobId)
throws IOException {
if (logDirPath == null) {
+ LOG.info("Log Directory is null, returning");
throw new IOException("Missing Log Directory for History");
}
MetaInfo oldFi = fileMap.get(jobId);
+ Configuration conf = getConfig();
long submitTime = (oldFi == null ? context.getClock().getTime() : oldFi.submitTime);
- Path logFile = getJobHistoryFile(logDirPath, jobId);
// String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
- String user = getConfig().get(MRJobConfig.USER_NAME);
+
+ Path logFile = JobHistoryUtils.getJobHistoryFile(logDirPath, jobId, startCount);
+ String user = conf.get(MRJobConfig.USER_NAME);
if (user == null) {
throw new IOException("User is null while setting up jobhistory eventwriter" );
}
@@ -207,16 +216,36 @@ public class JobHistoryEventHandler exte
if (writer == null) {
try {
- FSDataOutputStream out = logDirFc.create(logFile, EnumSet
- .of(CreateFlag.CREATE, CreateFlag.OVERWRITE));
+ FSDataOutputStream out = logDirFc.create(logFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE));
+ //TODO Permissions for the history file?
writer = new EventWriter(out);
} catch (IOException ioe) {
- LOG.info("Could not create log file for job " + jobName);
+ LOG.info("Could not create log file: [" + logFile + "] + for job " + "[" + jobName + "]");
throw ioe;
}
}
- /*TODO Storing the job conf on the log dir if required*/
- MetaInfo fi = new MetaInfo(logFile, writer, submitTime, user, jobName);
+
+ //This could be done at the end as well in moveToDone
+ Path logDirConfPath = null;
+ if (conf != null) {
+ logDirConfPath = getConfFile(logDirPath, jobId);
+ LOG.info("XXX: Attempting to write config to: " + logDirConfPath);
+ FSDataOutputStream jobFileOut = null;
+ try {
+ if (logDirConfPath != null) {
+ jobFileOut = logDirFc.create(logDirConfPath,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE));
+ conf.writeXml(jobFileOut);
+ jobFileOut.close();
+ }
+ } catch (IOException e) {
+ LOG.info("Failed to close the job configuration file "
+ + StringUtils.stringifyException(e));
+ }
+ }
+
+ MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName);
fileMap.put(jobId, fi);
}
@@ -228,6 +257,7 @@ public class JobHistoryEventHandler exte
if (mi != null) {
mi.closeWriter();
}
+
} catch (IOException e) {
LOG.info("Error closing writer for JobID: " + id);
throw e;
@@ -260,19 +290,11 @@ public class JobHistoryEventHandler exte
mi.writeEvent(historyEvent);
LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType());
} catch (IOException e) {
- LOG.error("in handler ioException " + e);
+ LOG.error("Error writing History Event " + e);
throw new YarnException(e);
}
// check for done
if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
- JobFinishedEvent jfe = (JobFinishedEvent) event.getHistoryEvent();
- String statusstoredir = doneDirPrefixPath + "/status/" + mi.user + "/" + mi.jobName;
- try {
- writeStatus(statusstoredir, jfe);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- throw new YarnException(e);
- }
try {
closeEventWriter(event.getJobID());
} catch (IOException e) {
@@ -283,27 +305,45 @@ public class JobHistoryEventHandler exte
protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
+
try {
- Path logFile = mi.getHistoryFile();
+ if (mi != null) {
+ mi.closeWriter();
+ }
+
+ if (mi == null || mi.getHistoryFile() == null) {
+ LOG.info("No file for job-history with " + jobId + " found in cache!");
+ }
+ if (mi.getConfFile() == null) {
+ LOG.info("No file for jobconf with " + jobId + " found in cache!");
+ }
+
+
//TODO fix - add indexed structure
//
- String doneDir = doneDirPrefixPath + "/" + mi.user + "/";
+ String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath.toString());
Path doneDirPath =
doneDirFc.makeQualified(new Path(doneDir));
if (!pathExists(doneDirFc, doneDirPath)) {
- doneDirFc.mkdir(doneDirPath, new FsPermission(HISTORY_DIR_PERMISSION),
- true);
+ try {
+ doneDirFc.mkdir(doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+ } catch (FileAlreadyExistsException e) {
+ LOG.info("Done directory: [" + doneDirPath + "] already exists.");
}
- // Path localFile = new Path(fromLocalFile);
- Path qualifiedLogFile =
- logDirFc.makeQualified(logFile);
- Path qualifiedDoneFile =
- doneDirFc.makeQualified(new Path(doneDirPath, mi.jobName));
- if (mi != null) {
- mi.closeWriter();
}
+ Path logFile = mi.getHistoryFile();
+ Path qualifiedLogFile = logDirFc.makeQualified(logFile);
+ Path qualifiedDoneFile = doneDirFc.makeQualified(new Path(doneDirPath,
+ getDoneJobHistoryFileName(jobId)));
moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+
+ Path confFile = mi.getConfFile();
+ Path qualifiedConfFile = logDirFc.makeQualified(confFile);
+ Path qualifiedConfDoneFile = doneDirFc.makeQualified(new Path(doneDirPath, getDoneConfFileName(jobId)));
+ moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
+
logDirFc.delete(qualifiedLogFile, true);
+ logDirFc.delete(qualifiedConfFile, true);
} catch (IOException e) {
LOG.info("Error closing writer for JobID: " + jobId);
throw e;
@@ -312,14 +352,16 @@ public class JobHistoryEventHandler exte
private static class MetaInfo {
private Path historyFile;
+ private Path confFile;
private EventWriter writer;
long submitTime;
String user;
String jobName;
- MetaInfo(Path historyFile, EventWriter writer, long submitTime,
+ MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
String user, String jobName) {
this.historyFile = historyFile;
+ this.confFile = conf;
this.writer = writer;
this.submitTime = submitTime;
this.user = user;
@@ -328,6 +370,8 @@ public class JobHistoryEventHandler exte
Path getHistoryFile() { return historyFile; }
+ Path getConfFile() {return confFile; }
+
synchronized void closeWriter() throws IOException {
if (writer != null) {
writer.close();
@@ -343,18 +387,30 @@ public class JobHistoryEventHandler exte
}
}
- /**
- * Get the job history file path
- */
- private Path getJobHistoryFile(Path dir, JobId jobId) {
- return new Path(dir, TypeConverter.fromYarn(jobId).toString() + "_" +
- startCount);
+ //TODO Move some of these functions into a utility class.
+
+ private String getDoneJobHistoryFileName(JobId jobId) {
+ return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
}
-/*
- *
- */
+ private String getDoneConfFileName(JobId jobId) {
+ return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.CONF_FILE_NAME_SUFFIX;
+ }
+
+ private Path getConfFile(Path logDir, JobId jobId) {
+ Path jobFilePath = null;
+ if (logDir != null) {
+ jobFilePath = new Path(logDir, TypeConverter.fromYarn(jobId).toString()
+ + "_" + startCount + JobHistoryUtils.CONF_FILE_NAME_SUFFIX);
+ }
+ return jobFilePath;
+ }
+
+
+ //TODO This could be done by the jobHistory server - move files to a temporary location
+ // which is scanned by the JH server - to move them to the final location.
+ // Currently JHEventHandler is moving files to the final location.
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
//check if path exists, in case of retries it may not exist
if (logDirFc.util().exists(fromPath)) {
@@ -370,7 +426,7 @@ public class JobHistoryEventHandler exte
else
LOG.info("copy failed");
doneDirFc.setPermission(toPath,
- new FsPermission(HISTORY_FILE_PERMISSION));
+ new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
}
}
@@ -382,7 +438,7 @@ public class JobHistoryEventHandler exte
try {
Path statusstorepath = doneDirFc.makeQualified(new Path(statusstoredir));
doneDirFc.mkdir(statusstorepath,
- new FsPermission(HISTORY_DIR_PERMISSION), true);
+ new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
Path toPath = new Path(statusstoredir, "jobstats");
FSDataOutputStream out = doneDirFc.create(toPath, EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE));
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon May 2 06:03:58 2011
@@ -680,7 +680,6 @@ public class JobImpl implements org.apac
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId
attemptID = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
- //TODO_get.set
attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
attemptID.getTaskId().setJobId(job.jobId);
attemptID.getTaskId().setTaskType(TaskType.MAP);//TODO:fix task type ??
@@ -694,11 +693,12 @@ public class JobImpl implements org.apac
job.committer = outputFormat.getOutputCommitter(taskContext);
//log to job history
+ //TODO_JH_Validate the values being sent here (along with defaults). Ideally for all JH evnts.
JobSubmittedEvent jse =
new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME,"mapred"), job.startTime,
- "test", constructJobACLs(job.conf),
+ job.remoteJobConfFile.toString(), constructJobACLs(job.conf),
job.conf.get(MRJobConfig.QUEUE_NAME,"test"));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon May 2 06:03:58 2011
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -150,19 +151,21 @@ public class RecoveryService extends Com
private void parse() throws IOException {
// TODO: parse history file based on startCount
String jobName = TypeConverter.fromYarn(appID).toString();
- String defaultStagingDir = getConfig().get(
- YARNApplicationConstants.APPS_STAGING_DIR_KEY)
- + "/history/staging";
- String jobhistoryDir = getConfig().get(
- YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
+// String defaultStagingDir = getConfig().get(
+// YARNApplicationConstants.APPS_STAGING_DIR_KEY)
+// + "/history/staging";
+
+// String jobhistoryDir = getConfig().get(
+// YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
+ String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
- historyFile = fc.makeQualified(new Path(histDirPath, jobName + "_" +
- (startCount -1))); //read the previous history file
+ historyFile = fc.makeQualified(JobHistoryUtils.getJobHistoryFile(
+ histDirPath, jobName, startCount - 1)); //read the previous history file
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java Mon May 2 06:03:58 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.Ya
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
import org.junit.Test;
public class TestJobHistoryParsing {
@@ -55,16 +56,18 @@ public class TestJobHistoryParsing {
app.waitForState(job, JobState.SUCCEEDED);
app.stop();
- String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString();
+ String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
String user =
conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
String jobhistoryDir = conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
- "file:///tmp/yarn/done/") + user;
+ "file:///tmp/yarn/done/");
String jobstatusDir = conf.get(STATUS_STORE_DIR_KEY,
- "file:///tmp/yarn/done/status/") + user + "/" +
- jobhistoryFileName;
+ "file:///tmp/yarn/done/status/") + jobhistoryFileName;
+
+ String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+
FSDataInputStream in = null;
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+ Path historyFilePath = new Path(currentJobHistoryDir, jobhistoryFileName);
LOG.info("JOBHISTORYDIRE IS " + historyFilePath);
try {
FileContext fc = FileContext.getFileContext(historyFilePath.toUri());
@@ -98,22 +101,22 @@ public class TestJobHistoryParsing {
Assert.assertTrue("total number of task attempts ",
taskAttemptCount == 1);
}
-
- // Test for checking jobstats for job status store
- Path statusFilePath = new Path(jobstatusDir, "jobstats");
- try {
- FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
- in = fc.open(statusFilePath);
- } catch (IOException ioe) {
- LOG.info("Can not open status file "+ ioe);
- throw (new Exception("Can not open status File"));
- }
- parser = new JobHistoryParser(in);
- jobInfo = parser.parse();
- Assert.assertTrue("incorrect finishedMap in job stats file ",
- jobInfo.getFinishedMaps() == 2);
- Assert.assertTrue("incorrect finishedReduces in job stats file ",
- jobInfo.getFinishedReduces() == 1);
+//
+// // Test for checking jobstats for job status store
+// Path statusFilePath = new Path(jobstatusDir, "jobstats");
+// try {
+// FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
+// in = fc.open(statusFilePath);
+// } catch (IOException ioe) {
+// LOG.info("Can not open status file "+ ioe);
+// throw (new Exception("Can not open status File"));
+// }
+// parser = new JobHistoryParser(in);
+// jobInfo = parser.parse();
+// Assert.assertTrue("incorrect finishedMap in job stats file ",
+// jobInfo.getFinishedMaps() == 2);
+// Assert.assertTrue("incorrect finishedReduces in job stats file ",
+// jobInfo.getFinishedReduces() == 1);
}
public static void main(String[] args) throws Exception {
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java?rev=1098495&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java Mon May 2 06:03:58 2011
@@ -0,0 +1,64 @@
+package org.apache.hadoop.mapreduce.v2.util;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+
+public class JobHistoryUtils {
+ public static final FsPermission HISTORY_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0750); // rwxr-x---
+
+ public static final FsPermission HISTORY_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0740); // rwxr-----
+
+ public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+
+ public static final String JOB_HISTORY_FILE_EXTENSION = ".jhist";
+
+ public static final int VERSION = 4;
+
+ public static final String LOG_VERSION_STRING = "version-" + VERSION;
+
+
+ public static String getCurrentDoneDir(String doneDirPrefix) {
+ return doneDirPrefix + File.separator + LOG_VERSION_STRING + File.separator;
+ }
+
+ public static String getConfiguredHistoryLogDirPrefix(Configuration conf) {
+ String defaultLogDir = conf.get(
+ YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/staging";
+ String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+ defaultLogDir);
+ return logDir;
+ }
+
+ public static String getConfiguredHistoryDoneDirPrefix(Configuration conf) {
+ String defaultDoneDir = conf.get(
+ YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
+ String doneDirPrefix =
+ conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+ defaultDoneDir);
+ return doneDirPrefix;
+ }
+
+ /**
+ * Get the job history file path
+ */
+ public static Path getJobHistoryFile(Path dir, JobId jobId, int attempt) {
+ return getJobHistoryFile(dir, TypeConverter.fromYarn(jobId).toString(), attempt);
+ }
+
+ /**
+ * Get the job history file path
+ */
+ public static Path getJobHistoryFile(Path dir, String jobId, int attempt) {
+ return new Path(dir, jobId + "_" +
+ attempt + JOB_HISTORY_FILE_EXTENSION);
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Mon May 2 06:03:58 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -68,8 +70,11 @@ public class CompletedJob implements org
private TaskAttemptCompletionEvent[] completionEvents;
private JobInfo jobInfo;
-
public CompletedJob(Configuration conf, JobId jobId) throws IOException {
+ this(conf, jobId, true);
+ }
+
+ public CompletedJob(Configuration conf, JobId jobId, boolean loadTasks) throws IOException {
this.conf = conf;
this.jobId = jobId;
//TODO fix
@@ -95,7 +100,7 @@ public class CompletedJob implements org
*/
//TODO: load the data lazily. for now load the full data upfront
- loadFullHistoryData();
+ loadFullHistoryData(loadTasks);
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
diagnostics.add(jobInfo.getErrorInfo());
@@ -153,7 +158,7 @@ public class CompletedJob implements org
}
//History data is leisurely loaded when task level data is requested
- private synchronized void loadFullHistoryData() {
+ private synchronized void loadFullHistoryData(boolean loadTasks) {
if (jobInfo != null) {
return; //data already loaded
}
@@ -162,29 +167,32 @@ public class CompletedJob implements org
LOG.error("user null is not allowed");
}
String jobName = TypeConverter.fromYarn(jobId).toString();
- String defaultDoneDir = conf.get(
- YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
- String jobhistoryDir =
- conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir)
- + "/" + user;
+
+ String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+
+
+ String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+
FSDataInputStream in = null;
Path historyFile = null;
try {
Path doneDirPath = FileContext.getFileContext(conf).makeQualified(
- new Path(jobhistoryDir));
+ new Path(currentJobHistoryDir));
FileContext fc =
FileContext.getFileContext(doneDirPath.toUri(),conf);
+ //TODO_JH_There could be multiple instances
+ //TODO_JH_FileName
historyFile =
- fc.makeQualified(new Path(doneDirPath, jobName));
+ fc.makeQualified(new Path(doneDirPath, jobName + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
- LOG.info("jobInfo loaded");
} catch (IOException e) {
throw new YarnException("Could not load history file " + historyFile,
e);
}
+ if (loadTasks) {
// populate the tasks
for (Map.Entry<org.apache.hadoop.mapreduce.TaskID, TaskInfo> entry : jobInfo
.getAllTasks().entrySet()) {
@@ -198,9 +206,11 @@ public class CompletedJob implements org
reduceTasks.put(task.getID(), task);
}
}
+ }
// TODO: populate the TaskAttemptCompletionEvent
completionEvents = new TaskAttemptCompletionEvent[0];
+ LOG.info("TaskInfo loaded");
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Mon May 2 06:03:58 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -67,8 +68,13 @@ public class CompletedTaskAttempt implem
@Override
public ContainerId getAssignedContainerID() {
- // TODO Auto-generated method stub
- return null;
+ //TODO ContainerId needs to be part of some historyEvent to be able to render the log directory.
+ ContainerId containerId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerId.class);
+ containerId.setId(-1);
+ containerId.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+ containerId.getAppId().setId(-1);
+ containerId.getAppId().setClusterTimestamp(-1);
+ return containerId;
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1098495&r1=1098494&r2=1098495&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Mon May 2 06:03:58 2011
@@ -19,10 +19,8 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,22 +32,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
+import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.Clock;
/*
* Loads and manages the Job history cache.
@@ -109,13 +102,13 @@ public class JobHistory implements Histo
public Map<JobId, Job> getAllJobs() {
//currently there is 1 to 1 mapping between app and job id
Map<JobId, Job> jobs = new HashMap<JobId, Job>();
- String defaultDoneDir = conf.get(
- YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
- String jobhistoryDir =
- conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir);
+ String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+
+ String currentJobHistoryDoneDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+
try {
Path done = FileContext.getFileContext(conf).makeQualified(
- new Path(jobhistoryDir));
+ new Path(currentJobHistoryDoneDir));
FileContext doneDirFc = FileContext.getFileContext(done.toUri(), conf);
RemoteIterator<LocatedFileStatus> historyFiles = doneDirFc.util()
.listFiles(done, true);
@@ -124,12 +117,16 @@ public class JobHistory implements Histo
while (historyFiles.hasNext()) {
f = historyFiles.next();
if (f.isDirectory()) continue;
- String jobName = f.getPath().getName();
+ if (!f.getPath().getName().endsWith(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION)) continue;
+ //TODO_JH_Change to parse the name properly
+ String fileName = f.getPath().getName();
+ String jobName = fileName.substring(0, fileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
+ LOG.info("Processing job: " + jobName);
org.apache.hadoop.mapreduce.JobID oldJobID = JobID.forName(jobName);
JobId jobID = TypeConverter.toYarn(oldJobID);
- Job job = new CompletedJob(conf, jobID);
+ Job job = new CompletedJob(conf, jobID, false);
jobs.put(jobID, job);
- completedJobCache.put(jobID, job);
+ // completedJobCache.put(jobID, job);
}
}
} catch (IOException ie) {
@@ -151,6 +148,7 @@ public class JobHistory implements Histo
return userName;
}
+
@Override
public Clock getClock() {
return null;