You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/10 11:44:28 UTC
svn commit: r1101385 [1/2] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job...
Author: sharad
Date: Tue May 10 09:44:27 2011
New Revision: 1101385
URL: http://svn.apache.org/viewvc?rev=1101385&view=rev
Log:
MAPREDUCE-2478. Improve history server. Contributed by Siddharth Seth.
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
Removed:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YARNApplicationConstants.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 10 09:44:27 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+ MAPREDUCE-2478. Improve history server. (Siddharth Seth via sharad)
+
Fix Null Pointer in TestUberAM. (sharad)
Fix refreshProxy in ClientServiceDelegate. (sharad)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue May 10 09:44:27 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.jobh
import java.io.IOException;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -30,7 +29,6 @@ import java.util.concurrent.LinkedBlocki
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +39,9 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
-import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -96,7 +96,7 @@ public class JobHistoryEventHandler exte
this.conf = conf;
String logDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(conf);
- String doneDirPrefix = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+ String doneDirPrefix = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
try {
doneDirPrefixPath = FileSystem.get(conf).makeQualified(
@@ -157,6 +157,7 @@ public class JobHistoryEventHandler exte
@Override
public void stop() {
+ LOG.info("Stopping JobHistoryEventHandler");
stopped = true;
//do not interrupt while event handling is in progress
synchronized(this) {
@@ -184,7 +185,7 @@ public class JobHistoryEventHandler exte
LOG.info("Exception while closing file " + e.getMessage());
}
}
-
+ LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.stop();
}
@@ -199,18 +200,18 @@ public class JobHistoryEventHandler exte
protected void setupEventWriter(JobId jobId)
throws IOException {
if (logDirPath == null) {
- LOG.info("Log Directory is null, returning");
+ LOG.error("Log Directory is null, returning");
throw new IOException("Missing Log Directory for History");
}
MetaInfo oldFi = fileMap.get(jobId);
Configuration conf = getConfig();
- long submitTime = (oldFi == null ? context.getClock().getTime() : oldFi.submitTime);
+ long submitTime = (oldFi == null ? context.getClock().getTime() : oldFi.getJobIndexInfo().getSubmitTime());
// String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
- Path logFile = JobHistoryUtils.getJobHistoryFile(logDirPath, jobId, startCount);
+ Path logFile = JobHistoryUtils.getStagingJobHistoryFile(logDirPath, jobId, startCount);
String user = conf.get(MRJobConfig.USER_NAME);
if (user == null) {
throw new IOException("User is null while setting up jobhistory eventwriter" );
@@ -223,6 +224,7 @@ public class JobHistoryEventHandler exte
FSDataOutputStream out = logDirFS.create(logFile, true);
//TODO Permissions for the history file?
writer = new EventWriter(out);
+ LOG.info("Event Writer setup for JobId: " + jobId + ", File: " + logFile);
} catch (IOException ioe) {
LOG.info("Could not create log file: [" + logFile + "] + for job " + "[" + jobName + "]");
throw ioe;
@@ -232,8 +234,7 @@ public class JobHistoryEventHandler exte
//This could be done at the end as well in moveToDone
Path logDirConfPath = null;
if (conf != null) {
- logDirConfPath = getConfFile(logDirPath, jobId);
- LOG.info("XXX: Attempting to write config to: " + logDirConfPath);
+ logDirConfPath = JobHistoryUtils.getStagingConfFile(logDirPath, jobId, startCount);
FSDataOutputStream jobFileOut = null;
try {
if (logDirConfPath != null) {
@@ -247,7 +248,7 @@ public class JobHistoryEventHandler exte
}
}
- MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName);
+ MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName, jobId);
fileMap.put(jobId, fi);
}
@@ -261,7 +262,7 @@ public class JobHistoryEventHandler exte
}
} catch (IOException e) {
- LOG.info("Error closing writer for JobID: " + id);
+ LOG.error("Error closing writer for JobID: " + id);
throw e;
}
}
@@ -277,6 +278,7 @@ public class JobHistoryEventHandler exte
protected synchronized void handleEvent(JobHistoryEvent event) {
// check for first event from a job
+ //TODO Log a meta line with version information.
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
try {
setupEventWriter(event.getJobID());
@@ -286,7 +288,6 @@ public class JobHistoryEventHandler exte
}
}
MetaInfo mi = fileMap.get(event.getJobID());
- EventWriter writer = fileMap.get(event.getJobID()).writer;
try {
HistoryEvent historyEvent = event.getHistoryEvent();
mi.writeEvent(historyEvent);
@@ -298,6 +299,10 @@ public class JobHistoryEventHandler exte
// check for done
if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
try {
+ JobFinishedEvent jFinishedEvent = (JobFinishedEvent)event.getHistoryEvent();
+ mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
+ mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
+ mi.getJobIndexInfo().setNumReduces(jFinishedEvent.getFinishedReduces());
closeEventWriter(event.getJobID());
} catch (IOException e) {
throw new YarnException(e);
@@ -308,46 +313,57 @@ public class JobHistoryEventHandler exte
protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
+ if (mi == null) {
+ throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
+ }
try {
- if (mi != null) {
mi.closeWriter();
+ } catch (IOException e) {
+ LOG.error("Error closing writer for JobID: " + jobId);
+ throw e;
}
- if (mi == null || mi.getHistoryFile() == null) {
- LOG.info("No file for job-history with " + jobId + " found in cache!");
+ if (mi.getHistoryFile() == null) {
+ LOG.warn("No file for job-history with " + jobId + " found in cache!");
}
if (mi.getConfFile() == null) {
- LOG.info("No file for jobconf with " + jobId + " found in cache!");
+ LOG.warn("No file for jobconf with " + jobId + " found in cache!");
}
-
- //TODO fix - add indexed structure
- //
- String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath.toString());
- Path doneDirPath =
- doneDirFS.makeQualified(new Path(doneDir));
- if (!pathExists(doneDirFS, doneDirPath)) {
+ String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath
+ .toString());
+ Path doneDirPath = doneDirFS.makeQualified(new Path(doneDir));
try {
- doneDirFS.mkdirs(doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION));
- } catch (FileAlreadyExistsException e) {
- LOG.info("Done directory: [" + doneDirPath + "] already exists.");
- }
+ if (!pathExists(doneDirFS, doneDirPath)) {
+ doneDirFS.mkdirs(doneDirPath, new FsPermission(
+ JobHistoryUtils.HISTORY_DIR_PERMISSION));
}
+
+ if (mi.getHistoryFile() != null) {
Path logFile = mi.getHistoryFile();
Path qualifiedLogFile = logDirFS.makeQualified(logFile);
+ String doneJobHistoryFileName = FileNameIndexUtils.getDoneFileName(mi
+ .getJobIndexInfo());
Path qualifiedDoneFile = doneDirFS.makeQualified(new Path(doneDirPath,
- getDoneJobHistoryFileName(jobId)));
+ doneJobHistoryFileName));
moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+ }
+ if (mi.getConfFile() != null) {
Path confFile = mi.getConfFile();
Path qualifiedConfFile = logDirFS.makeQualified(confFile);
- Path qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(doneDirPath, getDoneConfFileName(jobId)));
+ String doneConfFileName = JobHistoryUtils
+ .getIntermediateConfFileName(jobId);
+ Path qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(
+ doneDirPath, doneConfFileName));
moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
-
- logDirFS.delete(qualifiedLogFile, true);
- logDirFS.delete(qualifiedConfFile, true);
+ }
+ String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobId);
+ Path doneFilePath = doneDirFS.makeQualified(new Path(doneDirPath,
+ doneFileName));
+ touchFile(doneFilePath);
} catch (IOException e) {
- LOG.info("Error closing writer for JobID: " + jobId);
+ LOG.error("Error closing writer for JobID: " + jobId);
throw e;
}
}
@@ -356,24 +372,22 @@ public class JobHistoryEventHandler exte
private Path historyFile;
private Path confFile;
private EventWriter writer;
- long submitTime;
- String user;
- String jobName;
+ JobIndexInfo jobIndexInfo;
MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
- String user, String jobName) {
+ String user, String jobName, JobId jobId) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
- this.submitTime = submitTime;
- this.user = user;
- this.jobName = jobName;
+ this.jobIndexInfo = new JobIndexInfo(submitTime, -1, user, jobName, jobId, -1, -1);
}
Path getHistoryFile() { return historyFile; }
Path getConfFile() {return confFile; }
+ JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
+
synchronized void closeWriter() throws IOException {
if (writer != null) {
writer.close();
@@ -389,30 +403,6 @@ public class JobHistoryEventHandler exte
}
}
- //TODO Move some of these functions into a utility class.
-
-
- private String getDoneJobHistoryFileName(JobId jobId) {
- return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
- }
-
- private String getDoneConfFileName(JobId jobId) {
- return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.CONF_FILE_NAME_SUFFIX;
- }
-
- private Path getConfFile(Path logDir, JobId jobId) {
- Path jobFilePath = null;
- if (logDir != null) {
- jobFilePath = new Path(logDir, TypeConverter.fromYarn(jobId).toString()
- + "_" + startCount + JobHistoryUtils.CONF_FILE_NAME_SUFFIX);
- }
- return jobFilePath;
- }
-
-
- //TODO This could be done by the jobHistory server - move files to a temporary location
- // which is scanned by the JH server - to move them to the final location.
- // Currently JHEventHandler is moving files to the final location.
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
//check if path exists, in case of retries it may not exist
if (logDirFS.exists(fromPath)) {
@@ -430,7 +420,14 @@ public class JobHistoryEventHandler exte
LOG.info("copy failed");
doneDirFS.setPermission(toPath,
new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
+
+ logDirFS.delete(fromPath, false);
+ }
}
+
+ private void touchFile(Path path) throws IOException {
+ doneDirFS.createNewFile(path);
+ doneDirFS.setPermission(path, JobHistoryUtils.HISTORY_DIR_PERMISSION);
}
boolean pathExists(FileSystem fileSys, Path path) throws IOException {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue May 10 09:44:27 2011
@@ -653,8 +653,12 @@ public class JobImpl implements org.apac
}
}
- private void finished() {
+ private void setFinishTime() {
finishTime = clock.getTime();
+ }
+
+ private void finished() {
+ if (finishTime == 0) setFinishTime();
eventHandler.handle(new JobFinishEvent(jobId));
}
@@ -1000,6 +1004,18 @@ public class JobImpl implements org.apac
numReduceTasks, //TODO finishedReduceTasks
finalState.toString());
eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
+
+ JobFinishedEvent jfe =
+ new JobFinishedEvent(TypeConverter.fromYarn(jobId),
+ finishTime,
+ succeededMapTaskCount,
+ succeededReduceTaskCount, failedMapTaskCount,
+ failedReduceTaskCount,
+ TypeConverter.fromYarn(getCounters()), //TODO replace with MapCounter
+ TypeConverter.fromYarn(getCounters()), // TODO reduceCounters
+ TypeConverter.fromYarn(getCounters()));
+ eventHandler.handle(new JobHistoryEvent(jobId, jfe));
+ //TODO Does this require a JobFinishedEvent?
}
// Task-start has been moved out of InitTransition, so this arc simply
@@ -1125,6 +1141,7 @@ public class JobImpl implements org.apac
job.allowedMapFailuresPercent*job.numMapTasks ||
job.failedReduceTaskCount*100 >
job.allowedReduceFailuresPercent*job.numReduceTasks) {
+ job.setFinishTime();
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(job.jobId),
job.finishTime,
@@ -1132,18 +1149,8 @@ public class JobImpl implements org.apac
job.failedReduceTaskCount, //TODO finishedReduceTasks
org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
+ //TODO This event not likely required - sent via abort().
-// Adding JobFinishedEvent to dump counters
- JobFinishedEvent jfe =
- new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
- job.finishTime,
- job.succeededMapTaskCount,
- job.succeededReduceTaskCount, job.failedMapTaskCount,
- job.failedReduceTaskCount,
- TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounter
- TypeConverter.fromYarn(job.getCounters()), // TODO reduceCounters
- TypeConverter.fromYarn(job.getCounters()));
- job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe));
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
job.finished();
return JobState.FAILED;
@@ -1157,6 +1164,7 @@ public class JobImpl implements org.apac
LOG.warn("Could not do commit for Job", e);
}
// Log job-history
+ job.setFinishTime();
JobFinishedEvent jfe =
new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
job.finishTime,
@@ -1226,6 +1234,7 @@ public class JobImpl implements org.apac
@Override
protected JobState checkJobForCompletion(JobImpl job) {
if (job.completedTaskCount == job.tasks.size()) {
+ job.setFinishTime();
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
job.finished();
return JobState.KILLED;
@@ -1252,6 +1261,7 @@ public class JobImpl implements org.apac
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
+ //TODO JH Event?
job.finished();
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue May 10 09:44:27 2011
@@ -56,7 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
-import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -164,7 +164,7 @@ public class RecoveryService extends Com
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
- historyFile = fc.makeQualified(JobHistoryUtils.getJobHistoryFile(
+ historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath, jobName, startCount - 1)); //read the previous history file
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue May 10 09:44:27 2011
@@ -18,18 +18,23 @@
package org.apache.hadoop.mapreduce.v2.app;
+import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -38,10 +43,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
-import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
-import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -61,8 +62,10 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -76,9 +79,14 @@ import org.apache.hadoop.yarn.state.Stat
*/
public class MRApp extends MRAppMaster {
+ private static final Log LOG = LogFactory.getLog(MRApp.class);
+
int maps;
int reduces;
+ private File testWorkDir;
+ private Path testAbsPath;
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -93,12 +101,25 @@ public class MRApp extends MRAppMaster {
applicationId.setId(0);
}
- public MRApp(int maps, int reduces, boolean autoComplete) {
- this(maps, reduces, autoComplete, 1);
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) {
+ this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
- public MRApp(int maps, int reduces, boolean autoComplete, int startCount) {
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
super(applicationId, startCount);
+ this.testWorkDir = new File("target", testName);
+ testAbsPath = new Path(testWorkDir.getAbsolutePath());
+ LOG.info("PathUsed: " + testAbsPath);
+ if (cleanOnStart) {
+ testAbsPath = new Path(testWorkDir.getAbsolutePath());
+ try {
+ FileContext.getLocalFSFileContext().delete(testAbsPath, true);
+ } catch (Exception e) {
+ LOG.warn("COULD NOT CLEANUP: " + testAbsPath, e);
+ throw new YarnException("could not cleanup test dir", e);
+ }
+ }
+
this.maps = maps;
this.reduces = reduces;
this.autoComplete = autoComplete;
@@ -107,10 +128,8 @@ public class MRApp extends MRAppMaster {
public Job submit(Configuration conf) throws Exception {
String user = conf.get(MRJobConfig.USER_NAME, "mapred");
conf.set(MRJobConfig.USER_NAME, user);
- conf.set(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
- "file:///tmp/yarn/");
- conf.set(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
- "file:///tmp/yarn/done/");
+ conf.set(YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, testAbsPath.toString());
+
init(conf);
start();
Job job = getContext().getAllJobs().values().iterator().next();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Tue May 10 09:44:27 2011
@@ -76,7 +76,7 @@ public class MRAppBenchmark {
int maxConcurrentRunningTasks;
volatile int concurrentRunningTasks;
ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
- super(maps, reduces, true);
+ super(maps, reduces, true, "ThrottledMRApp", true);
this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
}
@@ -156,7 +156,7 @@ public class MRAppBenchmark {
int reduces = 100;
System.out.println("Running benchmark with maps:"+maps +
" reduces:"+reduces);
- run(new MRApp(maps, reduces, true));
+ run(new MRApp(maps, reduces, true, this.getClass().getName(), true));
}
public void benchmark2() throws Exception {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue May 10 09:44:27 2011
@@ -164,7 +164,7 @@ public class TestFail {
static class TimeOutTaskMRApp extends MRApp {
TimeOutTaskMRApp(int maps, int reduces) {
- super(maps, reduces, false);
+ super(maps, reduces, false, "TimeOutTaskMRApp", true);
}
@Override
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
@@ -188,7 +188,7 @@ public class TestFail {
static class MockFirstFailingTaskMRApp extends MRApp {
MockFirstFailingTaskMRApp(int maps, int reduces) {
- super(maps, reduces, true);
+ super(maps, reduces, true, "MockFirstFailingTaskMRApp", true);
}
@Override
@@ -209,7 +209,7 @@ public class TestFail {
//First attempt is failed
static class MockFirstFailingAttemptMRApp extends MRApp {
MockFirstFailingAttemptMRApp(int maps, int reduces) {
- super(maps, reduces, true);
+ super(maps, reduces, true, "MockFirstFailingAttemptMRApp", true);
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Tue May 10 09:44:27 2011
@@ -43,7 +43,7 @@ public class TestFetchFailure {
@Test
public void testFetchFailure() throws Exception {
- MRApp app = new MRApp(1, 1, false);
+ MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true);
Configuration conf = new Configuration();
// map -> reduce -> fetch-failure -> map retry is incompatible with
// sequential, single-task-attempt approach in uber-AM, so disable:
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Tue May 10 09:44:27 2011
@@ -190,7 +190,7 @@ public class TestKill {
static class BlockingMRApp extends MRApp {
private CountDownLatch latch;
BlockingMRApp(int maps, int reduces, CountDownLatch latch) {
- super(maps, reduces, true);
+ super(maps, reduces, true, "testKill", true);
this.latch = latch;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue May 10 09:44:27 2011
@@ -44,7 +44,7 @@ public class TestMRApp {
@Test
public void testMapReduce() throws Exception {
- MRApp app = new MRApp(2, 2, true);
+ MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true);
Job job = app.submit(new Configuration());
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
@@ -52,7 +52,7 @@ public class TestMRApp {
@Test
public void testCommitPending() throws Exception {
- MRApp app = new MRApp(1, 0, false);
+ MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
Job job = app.submit(new Configuration());
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
@@ -82,7 +82,7 @@ public class TestMRApp {
@Test
public void testCompletedMapsForReduceSlowstart() throws Exception {
- MRApp app = new MRApp(2, 1, false);
+ MRApp app = new MRApp(2, 1, false, this.getClass().getName(), true);
Configuration conf = new Configuration();
//after half of the map completion, reduce will start
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
@@ -141,7 +141,7 @@ public class TestMRApp {
@Test
public void testJobError() throws Exception {
- MRApp app = new MRApp(1, 0, false);
+ MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
Job job = app.submit(new Configuration());
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Tue May 10 09:44:27 2011
@@ -172,7 +172,7 @@ public class TestMRClientService {
class MRAppWithClientService extends MRApp {
MRClientService clientService = null;
MRAppWithClientService(int maps, int reduces, boolean autoComplete) {
- super(maps, reduces, autoComplete);
+ super(maps, reduces, autoComplete, "MRAppWithClientService", true);
}
@Override
protected ClientService createClientService(AppContext context) {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue May 10 09:44:27 2011
@@ -44,7 +44,7 @@ public class TestRecovery {
@Test
public void testCrashed() throws Exception {
int runCount = 0;
- MRApp app = new MRApp(2, 1, false, ++runCount);
+ MRApp app = new MRApp(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
@@ -127,7 +127,7 @@ public class TestRecovery {
//rerun
//in rerun the 1st map will be recovered from previous run
- app = new MRApp(2, 1, false, ++runCount);
+ app = new MRApp(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(YarnMRJobConfig.RECOVERY_ENABLE, true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Tue May 10 09:44:27 2011
@@ -42,13 +42,66 @@ public class YarnMRJobConfig {
/** host:port address to which to bind to **/
public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
+ /** Staging Dir for AppMaster **/
public static final String HISTORY_STAGING_DIR_KEY =
"yarn.historyfile.stagingDir";
+ /** Done Dir for for AppMaster **/
+ public static final String HISTORY_INTERMEDIATE_DONE_DIR_KEY =
+ "yarn.historyfile.intermediateDoneDir";
+
+ /** Done Dir for for AppMaster **/
public static final String HISTORY_DONE_DIR_KEY =
"yarn.historyfile.doneDir";
+
+ /** Done Dir for history server. **/
+ public static final String HISTORY_SERVER_DONE_DIR_KEY =
+ HS_PREFIX + ".historyfile.doneDir";
+
+ /**
+ * Size of the job list cache.
+ */
+ public static final String HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY =
+ HS_PREFIX + ".joblist.cache.size";
+
+ /**
+ * Size of the loaded job cache.
+ */
+ public static final String HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY =
+ HS_PREFIX + ".loadedjobs.cache.size";
+
+ /**
+ * Size of the date string cache. Effects the number of directories
+ * which will be scanned to find a job.
+ */
+ public static final String HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY =
+ HS_PREFIX + ".datestring.cache.size";
+
+ /**
+ * The time interval in milliseconds for the history server
+ * to wake up and scan for files to be moved.
+ */
+ public static final String HISTORY_SERVER_MOVE_THREAD_INTERVAL =
+ HS_PREFIX + ".move.thread.interval";
+
+ /**
+ * The number of threads used to move files.
+ */
+ public static final String HISTORY_SERVER_NUM_MOVE_THREADS =
+ HS_PREFIX + ".move.threads.count";
+
+ // Equivalent to 0.20 mapreduce.jobhistory.debug.mode
+ public static final String HISTORY_DEBUG_MODE_KEY = HS_PREFIX + ".debug.mode";
+
public static final String HISTORY_MAXAGE =
"yarn.historyfile.maxage";
+
+ /**
+ * Run interval for the History Cleaner thread.
+ */
+ public static final String HISTORY_CLEANER_RUN_INTERVAL =
+ HS_PREFIX + ".cleaner.run.interval";
+
public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
"address.webapp";
public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Tue May 10 09:44:27 2011
@@ -0,0 +1,226 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class FileNameIndexUtils {
+
+ static final String UNDERSCORE_ESCAPE = "%5F";
+ static final int JOB_NAME_TRIM_LENGTH = 50;
+
+ //This has to be underscore currently. Untill escape uses DELIMITER.
+ static final String DELIMITER = "_";
+
+ private static final int JOB_ID_INDEX = 0;
+ private static final int SUBMIT_TIME_INDEX = 1;
+ private static final int USER_INDEX = 2;
+ private static final int JOB_NAME_INDEX = 3;
+ private static final int FINISH_TIME_INDEX = 4;
+ private static final int NUM_MAPS_INDEX = 5;
+ private static final int NUM_REDUCES_INDEX = 6;
+ private static final int MAX_INDEX = NUM_REDUCES_INDEX;
+
+ /**
+ * Constructs the job history file name from the JobIndexInfo.
+ *
+ * @param indexInfo the index info.
+ * @return the done job history filename.
+ */
+ public static String getDoneFileName(JobIndexInfo indexInfo) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ //JobId
+ sb.append(escapeUnderscores(TypeConverter.fromYarn(indexInfo.getJobId()).toString()));
+ sb.append(DELIMITER);
+
+ //StartTime
+ sb.append(indexInfo.getSubmitTime());
+ sb.append(DELIMITER);
+
+ //UserName
+ sb.append(escapeUnderscores(getUserName(indexInfo)));
+ sb.append(DELIMITER);
+
+ //JobName
+ sb.append(escapeUnderscores(trimJobName(getJobName(indexInfo))));
+ sb.append(DELIMITER);
+
+ //FinishTime
+ sb.append(indexInfo.getFinishTime());
+ sb.append(DELIMITER);
+
+ //NumMaps
+ sb.append(indexInfo.getNumMaps());
+ sb.append(DELIMITER);
+
+ //NumReduces
+ sb.append(indexInfo.getNumReduces());
+
+ sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION);
+ return encodeJobHistoryFileName(sb.toString());
+ }
+
+ /**
+ * Parses the provided job history file name to construct a
+ * JobIndexInfo object which is returned.
+ *
+ * @param jhFileName the job history filename.
+ * @return a JobIndexInfo object built from the filename.
+ */
+ public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
+ String fileName = jhFileName.substring(0, jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
+ JobIndexInfo indexInfo = new JobIndexInfo();
+
+ String[] jobDetails = fileName.split(DELIMITER);
+ if (jobDetails.length != MAX_INDEX +1) {
+ throw new IOException("Failed to parse file: [" + jhFileName + "]. Expected " + (MAX_INDEX + 1) + "parts.");
+ }
+
+ JobID oldJobId = JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
+ JobId jobId = TypeConverter.toYarn(oldJobId);
+ indexInfo.setJobId(jobId);
+ //TODO Catch NumberFormatException - Do not fail if there's only a few fields missing.
+ indexInfo.setSubmitTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
+
+ indexInfo.setUser(decodeJobHistoryFileName(jobDetails[USER_INDEX]));
+
+ indexInfo.setJobName(decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
+
+ indexInfo.setFinishTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
+
+ indexInfo.setNumMaps(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
+
+ indexInfo.setNumReduces(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
+
+ return indexInfo;
+ }
+
+
+ /**
+ * Helper function to encode the URL of the filename of the job-history
+ * log file.
+ *
+ * @param logFileName file name of the job-history file
+ * @return URL encoded filename
+ * @throws IOException
+ */
+ public static String encodeJobHistoryFileName(String logFileName)
+ throws IOException {
+ String replacementUnderscoreEscape = null;
+
+ if (logFileName.contains(UNDERSCORE_ESCAPE)) {
+ replacementUnderscoreEscape = nonOccursString(logFileName);
+
+ logFileName = replaceStringInstances
+ (logFileName, UNDERSCORE_ESCAPE, replacementUnderscoreEscape);
+ }
+
+ String encodedFileName = null;
+ try {
+ encodedFileName = URLEncoder.encode(logFileName, "UTF-8");
+ } catch (UnsupportedEncodingException uee) {
+ IOException ioe = new IOException();
+ ioe.initCause(uee);
+ ioe.setStackTrace(uee.getStackTrace());
+ throw ioe;
+ }
+
+ if (replacementUnderscoreEscape != null) {
+ encodedFileName = replaceStringInstances
+ (encodedFileName, replacementUnderscoreEscape, UNDERSCORE_ESCAPE);
+ }
+
+ return encodedFileName;
+ }
+
+ /**
+ * Helper function to decode the URL of the filename of the job-history
+ * log file.
+ *
+ * @param logFileName file name of the job-history file
+ * @return URL decoded filename
+ * @throws IOException
+ */
+ public static String decodeJobHistoryFileName(String logFileName)
+ throws IOException {
+ String decodedFileName = null;
+ try {
+ decodedFileName = URLDecoder.decode(logFileName, "UTF-8");
+ } catch (UnsupportedEncodingException uee) {
+ IOException ioe = new IOException();
+ ioe.initCause(uee);
+ ioe.setStackTrace(uee.getStackTrace());
+ throw ioe;
+ }
+ return decodedFileName;
+ }
+
+ static String nonOccursString(String logFileName) {
+ int adHocIndex = 0;
+
+ String unfoundString = "q" + adHocIndex;
+
+ while (logFileName.contains(unfoundString)) {
+ unfoundString = "q" + ++adHocIndex;
+ }
+
+ return unfoundString + "q";
+ }
+
+ private static String getUserName(JobIndexInfo indexInfo) {
+ return getNonEmptyString(indexInfo.getUser());
+ }
+
+ private static String getJobName(JobIndexInfo indexInfo) {
+ return getNonEmptyString(indexInfo.getJobName());
+ }
+
+ //TODO Maybe handle default values for longs and integers here?
+
+ private static String getNonEmptyString(String in) {
+ if (in == null || in.length() == 0) {
+ in = "NA";
+ }
+ return in;
+ }
+
+ private static String escapeUnderscores(String escapee) {
+ return replaceStringInstances(escapee, "_", UNDERSCORE_ESCAPE);
+ }
+
+ // I tolerate this code because I expect a low number of
+ // occurrences in a relatively short string
+ private static String replaceStringInstances
+ (String logFileName, String old, String replacement) {
+ int index = logFileName.indexOf(old);
+
+ while (index > 0) {
+ logFileName = (logFileName.substring(0, index)
+ + replacement
+ + replaceStringInstances
+ (logFileName.substring(index + old.length()),
+ old, replacement));
+
+ index = logFileName.indexOf(old);
+ }
+
+ return logFileName;
+ }
+
+ /**
+ * Trims the job-name if required
+ */
+ private static String trimJobName(String jobName) {
+ if (jobName.length() > JOB_NAME_TRIM_LENGTH) {
+ jobName = jobName.substring(0, JOB_NAME_TRIM_LENGTH);
+ }
+ return jobName;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue May 10 09:44:27 2011
@@ -0,0 +1,393 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+
+public class JobHistoryUtils {
+
+ private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
+
+ public static final FsPermission HISTORY_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0750); // rwxr-x---
+
+ public static final FsPermission HISTORY_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0740); // rwxr-----
+
+ /**
+ * Suffix for configuration files.
+ */
+ public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+
+ /**
+ * Suffix for done files.
+ */
+ public static final String DONE_FILE_NAME_SUFFIX = ".done";
+
+ /**
+ * Job History File extension.
+ */
+ public static final String JOB_HISTORY_FILE_EXTENSION = ".jhist";
+
+ public static final int VERSION = 4;
+
+ public static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
+
+ public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + File.separator + "\\d{2}" + "\\" + File.separator + "\\d{2}";
+ public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
+ private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
+
+ /**
+ * Version substring to use while storing history files.
+ */
+ public static final String LOG_VERSION_STRING = "version-" + VERSION;
+
+ private static final PathFilter CONF_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().endsWith(CONF_FILE_NAME_SUFFIX);
+ }
+ };
+
+ private static final PathFilter JOB_HISTORY_FILE_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().endsWith(JOB_HISTORY_FILE_EXTENSION);
+ }
+ };
+
+
+ /**
+ * Gets a PathFilter which would match configuration files.
+ * @return
+ */
+ public static PathFilter getConfFileFilter() {
+ return CONF_FILTER;
+ }
+
+ /**
+ * Gets a PathFilter which would match job history file names.
+ * @return
+ */
+ public static PathFilter getHistoryFileFilter() {
+ return JOB_HISTORY_FILE_FILTER;
+ }
+
+ /**
+ * Returns the current done directory.
+ * @param doneDirPrefix the prefix for the done directory.
+ * @return A string representation of the done directory.
+ */
+ public static String getCurrentDoneDir(String doneDirPrefix) {
+ return doneDirPrefix + File.separator + LOG_VERSION_STRING + File.separator;
+ }
+
+ /**
+ * Gets the configured directory prefix for In Progress history files.
+ * @param conf
+ * @return A string representation of the prefix.
+ */
+ public static String getConfiguredHistoryLogDirPrefix(Configuration conf) {
+ String defaultLogDir = conf.get(
+ YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/staging";
+ String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+ defaultLogDir);
+ return logDir;
+ }
+
+ /**
+ * Gets the configured directory prefix for intermediate done history files.
+ * @param conf
+ * @return A string representation of the prefix.
+ */
+ public static String getConfiguredHistoryIntermediateDoneDirPrefix(Configuration conf) {
+ String defaultDoneDir = conf.get(
+ YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done_intermediate";
+ String doneDirPrefix =
+ conf.get(YarnMRJobConfig.HISTORY_INTERMEDIATE_DONE_DIR_KEY,
+ defaultDoneDir);
+ return doneDirPrefix;
+ }
+
+ /**
+ * Gets the configured directory prefix for Done history files.
+ * @param conf
+ * @return
+ */
+ public static String getConfiguredHistoryServerDoneDirPrefix(Configuration conf) {
+ String defaultDoneDir = conf.get(
+ YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done";
+ String doneDirPrefix =
+ conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+ defaultDoneDir);
+ return doneDirPrefix;
+ }
+
+ /**
+ * Get the job history file path for non Done history files.
+ */
+ public static Path getStagingJobHistoryFile(Path dir, JobId jobId, int attempt) {
+ return getStagingJobHistoryFile(dir, TypeConverter.fromYarn(jobId).toString(), attempt);
+ }
+
+ /**
+ * Get the job history file path for non Done history files.
+ */
+ public static Path getStagingJobHistoryFile(Path dir, String jobId, int attempt) {
+ return new Path(dir, jobId + "_" +
+ attempt + JOB_HISTORY_FILE_EXTENSION);
+ }
+
+ /**
+ * Get the done configuration file name for a job.
+ * @param jobId the jobId.
+ * @return the conf file name.
+ */
+ public static String getIntermediateConfFileName(JobId jobId) {
+ return TypeConverter.fromYarn(jobId).toString() + CONF_FILE_NAME_SUFFIX;
+ }
+
+ public static String getIntermediateDoneFileName(JobId jobId) {
+ return TypeConverter.fromYarn(jobId).toString() + DONE_FILE_NAME_SUFFIX;
+ }
+
+ /**
+ * Gets the conf file path for jobs in progress.
+ *
+ * @param logDir the log directory prefix.
+ * @param jobId the jobId.
+ * @param attempt attempt number for this job.
+ * @return
+ */
+ public static Path getStagingConfFile(Path logDir, JobId jobId, int attempt) {
+ Path jobFilePath = null;
+ if (logDir != null) {
+ jobFilePath = new Path(logDir, TypeConverter.fromYarn(jobId).toString()
+ + "_" + attempt + CONF_FILE_NAME_SUFFIX);
+ }
+ return jobFilePath;
+ }
+
+ /**
+ * Gets the serial number part of the path based on the jobId and serialNumber format.
+ * @param id
+ * @param serialNumberFormat
+ * @return
+ */
+ public static String serialNumberDirectoryComponent(JobId id, String serialNumberFormat) {
+ return String.format(serialNumberFormat,
+ Integer.valueOf(jobSerialNumber(id))).substring(0,
+ SERIAL_NUMBER_DIRECTORY_DIGITS);
+ }
+
+ /**Extracts the timstamp component from the path.
+ * @param path
+ * @return
+ */
+ public static String getTimestampPartFromPath(String path) {
+ Matcher matcher = TIMESTAMP_DIR_PATTERN.matcher(path);
+ if (matcher.find()) {
+ String matched = matcher.group();
+ matched.intern();
+ return matched;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the history subdirectory based on the jobId, timestamp and serial number format.
+ * @param id
+ * @param timestampComponent
+ * @param serialNumberFormat
+ * @return
+ */
+ public static String historyLogSubdirectory(JobId id, String timestampComponent, String serialNumberFormat) {
+ String result = LOG_VERSION_STRING;
+ String serialNumberDirectory = serialNumberDirectoryComponent(id, serialNumberFormat);
+
+ result = result
+ + File.separator + timestampComponent
+ + File.separator + serialNumberDirectory
+ + File.separator;
+
+ return result;
+ }
+
+ /**
+ * Gets the timestamp component based on millisecond time.
+ * @param millisecondTime
+ * @param debugMode
+ * @return
+ */
+ public static String timestampDirectoryComponent(long millisecondTime, boolean debugMode) {
+ Calendar timestamp = Calendar.getInstance();
+ timestamp.setTimeInMillis(millisecondTime);
+ String dateString = null;
+ dateString = String.format(
+ TIMESTAMP_DIR_FORMAT,
+ timestamp.get(Calendar.YEAR),
+ // months are 0-based in Calendar, but people will expect January
+ // to be month #1.
+ timestamp.get(debugMode ? Calendar.HOUR : Calendar.MONTH) + 1,
+ timestamp.get(debugMode ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
+ dateString = dateString.intern();
+ return dateString;
+ }
+
+ public static String doneSubdirsBeforeSerialTail() {
+ // Version Info
+ String result = ("/" + LOG_VERSION_STRING);
+
+ // date
+ result = result + "/*/*/*"; // YYYY/MM/DD ;
+ return result;
+ }
+
+ /**
+ * Computes a serial number used as part of directory naming for the given jobId.
+ * @param id the jobId.
+ * @return
+ */
+ public static int jobSerialNumber(JobId id) {
+ return id.getId();
+ }
+
+ public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail)
+ throws IOException {
+ return localGlobber(fc, root, tail, null);
+ }
+
+ public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail,
+ PathFilter filter) throws IOException {
+ return localGlobber(fc, root, tail, filter, null);
+ }
+
+ // hasMismatches is just used to return a second value if you want
+ // one. I would have used MutableBoxedBoolean if such had been provided.
+ public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail,
+ PathFilter filter, AtomicBoolean hasFlatFiles) throws IOException {
+ if (tail.equals("")) {
+ return (listFilteredStatus(fc, root, filter));
+ }
+
+ if (tail.startsWith("/*")) {
+ Path[] subdirs = filteredStat2Paths(
+ remoteIterToList(fc.listStatus(root)), true, hasFlatFiles);
+
+ List<List<FileStatus>> subsubdirs = new LinkedList<List<FileStatus>>();
+
+ int subsubdirCount = 0;
+
+ if (subdirs.length == 0) {
+ return new LinkedList<FileStatus>();
+ }
+
+ String newTail = tail.substring(2);
+
+ for (int i = 0; i < subdirs.length; ++i) {
+ subsubdirs.add(localGlobber(fc, subdirs[i], newTail, filter, null));
+ // subsubdirs.set(i, localGlobber(fc, subdirs[i], newTail, filter,
+ // null));
+ subsubdirCount += subsubdirs.get(i).size();
+ }
+
+ List<FileStatus> result = new LinkedList<FileStatus>();
+
+ for (int i = 0; i < subsubdirs.size(); ++i) {
+ result.addAll(subsubdirs.get(i));
+ }
+
+ return result;
+ }
+
+ if (tail.startsWith("/")) {
+ int split = tail.indexOf('/', 1);
+
+ if (split < 0) {
+ return listFilteredStatus(fc, new Path(root, tail.substring(1)), filter);
+ } else {
+ String thisSegment = tail.substring(1, split);
+ String newTail = tail.substring(split);
+ return localGlobber(fc, new Path(root, thisSegment), newTail, filter,
+ hasFlatFiles);
+ }
+ }
+
+ IOException e = new IOException("localGlobber: bad tail");
+
+ throw e;
+ }
+
+ private static List<FileStatus> listFilteredStatus(FileContext fc, Path root,
+ PathFilter filter) throws IOException {
+ List<FileStatus> fsList = remoteIterToList(fc.listStatus(root));
+ if (filter == null) {
+ return fsList;
+ } else {
+ List<FileStatus> filteredList = new LinkedList<FileStatus>();
+ for (FileStatus fs : fsList) {
+ if (filter.accept(fs.getPath())) {
+ filteredList.add(fs);
+ }
+ }
+ return filteredList;
+ }
+ }
+
+ private static List<FileStatus> remoteIterToList(
+ RemoteIterator<FileStatus> rIter) throws IOException {
+ List<FileStatus> fsList = new LinkedList<FileStatus>();
+ if (rIter == null)
+ return fsList;
+ while (rIter.hasNext()) {
+ fsList.add(rIter.next());
+ }
+ return fsList;
+ }
+
+ // hasMismatches is just used to return a second value if you want
+ // one. I would have used MutableBoxedBoolean if such had been provided.
+ private static Path[] filteredStat2Paths(List<FileStatus> stats, boolean dirs,
+ AtomicBoolean hasMismatches) {
+ int resultCount = 0;
+
+ if (hasMismatches == null) {
+ hasMismatches = new AtomicBoolean(false);
+ }
+
+ for (int i = 0; i < stats.size(); ++i) {
+ if (stats.get(i).isDirectory() == dirs) {
+ stats.set(resultCount++, stats.get(i));
+ } else {
+ hasMismatches.set(true);
+ }
+ }
+
+ Path[] result = new Path[resultCount];
+ for (int i = 0; i < resultCount; i++) {
+ result[i] = stats.get(i).getPath();
+ }
+
+ return result;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java Tue May 10 09:44:27 2011
@@ -0,0 +1,83 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+/**
+ * Maintains information which may be used by the jobHistroy indexing
+ * system.
+ */
+public class JobIndexInfo {
+ private long submitTime;
+ private long finishTime;
+ private String user;
+ private String jobName;
+ private JobId jobId;
+ private int numMaps;
+ private int numReduces;
+
+ public JobIndexInfo() {
+ }
+
+ public JobIndexInfo(long submitTime, long finishTime, String user,
+ String jobName, JobId jobId, int numMaps, int numReduces) {
+ this.submitTime = submitTime;
+ this.finishTime = finishTime;
+ this.user = user;
+ this.jobName = jobName;
+ this.jobId = jobId;
+ this.numMaps = numMaps;
+ this.numReduces = numReduces;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+ public void setSubmitTime(long submitTime) {
+ this.submitTime = submitTime;
+ }
+ public long getFinishTime() {
+ return finishTime;
+ }
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+ public String getUser() {
+ return user;
+ }
+ public void setUser(String user) {
+ this.user = user;
+ }
+ public String getJobName() {
+ return jobName;
+ }
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+ public JobId getJobId() {
+ return jobId;
+ }
+ public void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ }
+ public int getNumMaps() {
+ return numMaps;
+ }
+ public void setNumMaps(int numMaps) {
+ this.numMaps = numMaps;
+ }
+ public int getNumReduces() {
+ return numReduces;
+ }
+ public void setNumReduces(int numReduces) {
+ this.numReduces = numReduces;
+ }
+
+ @Override
+ public String toString() {
+ return "JobIndexInfo [submitTime=" + submitTime + ", finishTime="
+ + finishTime + ", user=" + user + ", jobName=" + jobName + ", jobId="
+ + jobId + ", numMaps=" + numMaps + ", numReduces=" + numReduces + "]";
+ }
+
+
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java Tue May 10 09:44:27 2011
@@ -45,7 +45,8 @@ public abstract class ClientFactory {
try {
return factory.newInstance().createClient(conf);
} catch (Exception e) {
- throw new IOException("Could not create ClientProtocol", e);
+ throw new IOException("Could not create ClientProtocol using factory: "
+ + factory.getName(), e);
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue May 10 09:44:27 2011
@@ -45,7 +45,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.YarnException;
@@ -71,37 +71,12 @@ public class CompletedJob implements org
private TaskAttemptCompletionEvent[] completionEvents;
private JobInfo jobInfo;
- public CompletedJob(Configuration conf, JobId jobId) throws IOException {
- this(conf, jobId, true);
- }
-
- public CompletedJob(Configuration conf, JobId jobId, boolean loadTasks) throws IOException {
+ public CompletedJob(Configuration conf, JobId jobId, Path historyFile, boolean loadTasks) throws IOException {
this.conf = conf;
this.jobId = jobId;
- //TODO fix
- /*
- String doneLocation =
- conf.get(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION,
- "file:///tmp/yarn/done/status");
- String user =
- conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
- String statusstoredir =
- doneLocation + "/" + user + "/" + TypeConverter.fromYarn(jobID).toString();
- Path statusFile = new Path(statusstoredir, "jobstats");
- try {
- FileContext fc = FileContext.getFileContext(statusFile.toUri(), conf);
- FSDataInputStream in = fc.open(statusFile);
- JobHistoryParser parser = new JobHistoryParser(in);
- jobStats = parser.parse();
- } catch (IOException e) {
- LOG.info("Could not open job status store file from dfs " +
- TypeConverter.fromYarn(jobID).toString());
- throw new IOException(e);
- }
- */
//TODO: load the data lazily. for now load the full data upfront
- loadFullHistoryData(loadTasks);
+ loadFullHistoryData(loadTasks, historyFile);
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
diagnostics.add(jobInfo.getErrorInfo());
@@ -159,7 +134,7 @@ public class CompletedJob implements org
}
//History data is leisurely loaded when task level data is requested
- private synchronized void loadFullHistoryData(boolean loadTasks) {
+ private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute) {
if (jobInfo != null) {
return; //data already loaded
}
@@ -169,11 +144,21 @@ public class CompletedJob implements org
}
String jobName = TypeConverter.fromYarn(jobId).toString();
- String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+ String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
+ if (historyFileAbsolute != null) {
+ try {
+ JobHistoryParser parser = new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), historyFileAbsolute);
+ jobInfo = parser.parse();
+ } catch (IOException e) {
+ throw new YarnException("Could not load history file " + historyFileAbsolute,
+ e);
+ }
+ }
+ else {
FSDataInputStream in = null;
Path historyFile = null;
try {
@@ -192,6 +177,7 @@ public class CompletedJob implements org
throw new YarnException("Could not load history file " + historyFile,
e);
}
+ }
if (loadTasks) {
// populate the tasks
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java Tue May 10 09:44:27 2011
@@ -18,33 +18,24 @@
package org.apache.hadoop.mapreduce.v2.hs;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.mapreduce.v2.hs.JobHistory.HistoryCleaner;
import org.apache.hadoop.yarn.service.AbstractService;
public class HistoryCleanerService extends AbstractService {
private static final Log LOG = LogFactory.getLog(HistoryClientService.class);
- static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L;
- private FileContext doneDirFc;
- private HistoryCleaner historyCleanerThread = null;
+ static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
+ static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
+
+ private ScheduledThreadPoolExecutor scheduledExecutor = null;
private Configuration conf;
@@ -54,89 +45,32 @@ public class HistoryCleanerService exten
}
public void start() {
- long maxAgeOfHistoryFiles = conf.getLong(
- YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
- historyCleanerThread = new HistoryCleaner(maxAgeOfHistoryFiles);
- historyCleanerThread.start();
- super.start();
+// long maxAgeOfHistoryFiles = conf.getLong(
+// YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
+// scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+// long runInterval = conf.getLong(YarnMRJobConfig.HISTORY_CLEANER_RUN_INTERVAL, DEFAULT_RUN_INTERVAL);
+// HistoryCleaner c;
+// scheduledExecutor.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), 30*1000l, runInterval, TimeUnit.MILLISECONDS);
+// super.start();
}
/** Shut down JobHistory after stopping the History cleaner */
@Override
public void stop() {
- LOG.info("Interrupting History Cleaner");
- historyCleanerThread.interrupt();
- try {
- historyCleanerThread.join();
- } catch (InterruptedException e) {
- LOG.info("Error with shutting down history cleaner thread");
- }
- }
- /**
- * Delete history files older than a specified time duration.
- */
- class HistoryCleaner extends Thread {
- static final long ONE_DAY_IN_MS = 7 * 24 * 60 * 60 * 1000L;
- private long cleanupFrequency;
- private long maxAgeOfHistoryFiles;
-
- public HistoryCleaner(long maxAge) {
- setName("Thread for cleaning up History files");
- setDaemon(true);
- this.maxAgeOfHistoryFiles = maxAge;
- cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles);
- LOG.info("Job History Cleaner Thread started." +
- " MaxAge is " +
- maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," +
- " Cleanup Frequency is " +
- + cleanupFrequency + " ms (" +
- ((float)cleanupFrequency)/ONE_DAY_IN_MS + " days)");
- }
-
- @Override
- public void run(){
-
- while (true) {
- try {
- doCleanup();
- Thread.sleep(cleanupFrequency);
- }
- catch (InterruptedException e) {
- LOG.info("History Cleaner thread exiting");
- return;
- }
- catch (Throwable t) {
- LOG.warn("History cleaner thread threw an exception", t);
- }
- }
+// LOG.info("Interrupting History Cleaner");
+// scheduledExecutor.shutdown();
+// boolean interrupted = false;
+// long currentTime = System.currentTimeMillis();
+// while (!scheduledExecutor.isShutdown() && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
+// try {
+// Thread.sleep(20);
+// } catch (InterruptedException e) {
+// interrupted = true;
+// }
+// }
+// if (!scheduledExecutor.isShutdown()) {
+// LOG.warn("HistoryCleanerService shutdown may not have succeeded");
+// }
}
- private void doCleanup() {
- long now = System.currentTimeMillis();
- try {
- String defaultDoneDir = conf.get(
- YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
- String jobhistoryDir =
- conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir);
- Path done = FileContext.getFileContext(conf).makeQualified(
- new Path(jobhistoryDir));
- doneDirFc = FileContext.getFileContext(done.toUri(), conf);
- RemoteIterator<LocatedFileStatus> historyFiles =
- doneDirFc.util().listFiles(done, true);
- if (historyFiles != null) {
- FileStatus f;
- while (historyFiles.hasNext()) {
- f = historyFiles.next();
- if (now - f.getModificationTime() > maxAgeOfHistoryFiles) {
- doneDirFc.delete(f.getPath(), true);
- LOG.info("Deleting old history file : " + f.getPath());
- }
- }
- }
- } catch (IOException ie) {
- LOG.info("Error cleaning up history directory" +
- StringUtils.stringifyException(ie));
- }
- }
- }
}