You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:53:05 UTC
svn commit: r1079184 - in /hadoop/mapreduce/branches/yahoo-merge/src:
java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/
java/org/apache/hadoop/mapreduce/jobhistory/
test/mapred/org/apache/hadoop/mapred/ test/mapred/org/apache/hadoop/tool...
Author: omalley
Date: Tue Mar 8 05:53:05 2011
New Revision: 1079184
URL: http://svn.apache.org/viewvc?rev=1079184&view=rev
Log:
commit ef1a2a3dcc014ffe026c7be3c2e06a44da14de3a
Author: Richard King <dk...@yahoo-inc.com>
Date: Mon Nov 15 22:43:39 2010 +0000
increase the flexibility of searching the job history in
jobhistory.jsp . Also, stores job history files in multiple
directories, and establishes a rudimentry database index, to make
searches more performant.
+++ b/YAHOO-CHANGES.txt
+ increase the flexibility of searching the job history in
+ jobhistory.jsp . From
+ By dking
+
+
+ stores job history files in multiple directories, and establishes
+ a rudimentry database index, to make searches more performant.
+ From
+ By dking
+
Modified:
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Mar 8 05:53:05 2011
@@ -247,7 +247,7 @@ public class JobTracker implements MRCon
* Return the JT's job history handle.
* @return the jobhistory handle
*/
- JobHistory getJobHistory() { return jobHistory; }
+ public JobHistory getJobHistory() { return jobHistory; }
/**
* Start the JobTracker with given configuration.
*
@@ -1617,6 +1617,9 @@ public class JobTracker implements MRCon
}
});
infoServer.setAttribute("fileSys", historyFS);
+ infoServer.setAttribute("jobHistoryHistory", jobHistory);
+ infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString());
+ infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString());
this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
@@ -4622,6 +4625,9 @@ public class JobTracker implements MRCon
}
infoServer.setAttribute("fileSys", historyFS);
+ infoServer.setAttribute("jobHistoryHistory", jobHistory);
+ infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString());
+ infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString());
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
infoServer.start();
this.infoPort = this.infoServer.getPort();
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java Tue Mar 8 05:53:05 2011
@@ -259,8 +259,7 @@ public class Cluster {
if (jobHistoryDir == null) {
jobHistoryDir = new Path(client.getJobHistoryDir());
}
- return JobHistory.getJobHistoryFile(jobHistoryDir, jobId,
- ugi.getShortUserName()).toString();
+ return JobHistory.getJobHistoryFile(jobHistoryDir, jobId).toString();
}
/**
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Tue Mar 8 05:53:05 2011
@@ -18,20 +18,38 @@
package org.apache.hadoop.mapreduce.jobhistory;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -43,7 +61,9 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTracker;
@@ -64,7 +84,7 @@ public class JobHistory {
final Log LOG = LogFactory.getLog(JobHistory.class);
private long jobHistoryBlockSize;
- private final Map<JobID, MetaInfo> fileMap =
+ private static final Map<JobID, MetaInfo> fileMap =
Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
private ThreadPoolExecutor executor = null;
static final FsPermission HISTORY_DIR_PERMISSION =
@@ -82,23 +102,69 @@ public class JobHistory {
private Path logDir = null;
private Path done = null; // folder for completed jobs
+ private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail();
+ private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*";
+
+ static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+
+ // XXXXX debug mode -- set this to false for production
+ private static final boolean DEBUG_MODE = false;
+
+ private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
+ private static final int SERIAL_NUMBER_LOW_DIGITS = DEBUG_MODE ? 1 : 3;
+
+ private static final String SERIAL_NUMBER_FORMAT
+ = ("%0"
+ + (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS)
+ + "d");
+
+ private static final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+
+ private static final SortedMap<Integer, String> idToDateString
+ = new TreeMap<Integer, String>();
+
+ private static Pattern historyCleanerParseDirectory
+ = Pattern.compile(".+/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)/?");
+ // .+ / YYYY / MM / DD / HH /?
+
+
public static final String OLD_SUFFIX = ".old";
+ public static final String OLD_FULL_SUFFIX_REGEX_STRING
+ = "(?:\\.[0-9]+" + Pattern.quote(OLD_SUFFIX) + ")";
// Version string that will prefix all History Files
public static final String HISTORY_VERSION = "1.0";
private HistoryCleaner historyCleanerThread = null;
- private Map<JobID, MovedFileInfo> jobHistoryFileMap =
+ private static final int version = 3;
+ private static final String LOG_VERSION_STRING = "version-" + version;
+
+ private long jobTrackerStartTime;
+ private String jobTrackerHostName;
+ private String jobTrackerUniqueName;
+
+ private static final Map<JobID, MovedFileInfo> jobHistoryFileMap =
Collections.<JobID,MovedFileInfo>synchronizedMap(
new LinkedHashMap<JobID, MovedFileInfo>());
+ // The invariant is that UnindexedElementsState tracks the identity
+ // of the currently-filling done directory subdirectory, and what
+ // needs to indexed.
+ // Has to be locked for each file disposition decision
+ private final UnindexedElementsState ueState = new UnindexedElementsState();
+
// JobHistory filename regex
public static final Pattern JOBHISTORY_FILENAME_REGEX =
- Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+");
+ Pattern.compile("(" + JobID.JOBID_REGEX + ")"
+ + OLD_FULL_SUFFIX_REGEX_STRING + "?");
// JobHistory conf-filename regex
public static final Pattern CONF_FILENAME_REGEX =
- Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+ Pattern.compile("(" + JobID.JOBID_REGEX + ")"
+ + CONF_FILE_NAME_SUFFIX
+ + OLD_FULL_SUFFIX_REGEX_STRING + "?");
+
+ private static final int MAXIMUM_DATESTRING_COUNT = 200000;
private static class MovedFileInfo {
private final String historyFile;
@@ -119,6 +185,11 @@ public class JobHistory {
public void init(JobTracker jt, JobConf conf, String hostname,
long jobTrackerStartTime) throws IOException {
+ jobTrackerHostName = hostname;
+ this.jobTrackerStartTime = jobTrackerStartTime;
+
+ this.jobTrackerUniqueName = jobTrackerHostName + "-" + jobTrackerStartTime;
+
// Get and create the log folder
final String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION ,
"file:///" +
@@ -145,6 +216,20 @@ public class JobHistory {
jobTracker = jt;
}
+
+ public static String leafGlobString() {
+ return "/" + LOG_VERSION_STRING
+ + "/*" // job tracker ID
+ + "/YYYY/MM/DD/HH" // time segment
+ ;
+ }
+
+ public static String globString() {
+ return "/" + LOG_VERSION_STRING
+ + "/*" // job tracker ID
+ + "/YYYY/MM/DD" // time segment
+ ;
+ }
/** Initialize the done directory and start the history cleaner thread */
public void initDone(JobConf conf, FileSystem fs) throws IOException {
@@ -163,8 +248,8 @@ public class JobHistory {
//permission
if (!doneDirFs.exists(done)) {
LOG.info("Creating DONE folder at "+ done);
- if (! doneDirFs.mkdirs(done,
- new FsPermission(HISTORY_DIR_PERMISSION))) {
+ if (!doneDirFs.mkdirs(done,
+ new FsPermission(HISTORY_DIR_PERMISSION))) {
throw new IOException("Mkdirs failed to create " + done.toString());
}
}
@@ -218,11 +303,20 @@ public class JobHistory {
/**
* Get the job history file path
*/
- public static Path getJobHistoryFile(Path dir, JobID jobId,
- String user) {
- return new Path(dir, jobId.toString() + "_" + user);
+ public static Path getJobHistoryFile
+ (Path dir, JobID jobId) {
+ MetaInfo info = fileMap.get(jobId);
+
+ if (info == null) {
+ fileMap.put(jobId, new MetaInfo(null, null, null, System.currentTimeMillis(), null, null));
+ return getJobHistoryFile(dir, jobId);
+ }
+
+ return new Path(dir, jobId.toString());
}
+
+
/**
* Get the JobID from the history file's name. See it's companion method
* {@link #getJobHistoryFile(Path, JobID, String)} for how history file's name
@@ -238,19 +332,37 @@ public class JobHistory {
return JobID.forName(jobId);
}
- /**
- * Get the user name of the job-submitter from the history file's name. See
- * it's companion method {@link #getJobHistoryFile(Path, JobID, String)} for
- * how history file's name is constructed from a given JobID and username.
- *
- * @param jobHistoryFilePath
- * @return the user-name
- */
- public static String getUserFromHistoryFilePath(Path jobHistoryFilePath) {
- String[] jobDetails = jobHistoryFilePath.getName().split("_");
- return jobDetails[3];
+ static String nonOccursString(String logFileName) {
+ int adHocIndex = 0;
+
+ String unfoundString = "q" + adHocIndex;
+
+ while (logFileName.contains(unfoundString)) {
+ unfoundString = "q" + ++adHocIndex;
+ }
+
+ return unfoundString + "q";
}
+ // I tolerate this code because I expect a low number of
+ // occurrences in a relatively short string
+ static String replaceStringInstances
+ (String logFileName, String old, String replacement) {
+ int index = logFileName.indexOf(old);
+
+ while (index > 0) {
+ logFileName = (logFileName.substring(0, index)
+ + replacement
+ + replaceStringInstances
+ (logFileName.substring(index + old.length()),
+ old, replacement));
+
+ index = logFileName.indexOf(old);
+ }
+
+ return logFileName;
+ }
+
/**
* Given the job id, return the history file path from the cache
*/
@@ -261,6 +373,31 @@ public class JobHistory {
}
return info.historyFile;
}
+
+ /**
+ * Given the job id, return the conf.xml file path from the cache
+ */
+ public String getConfFilePath(JobID jobId) {
+ MovedFileInfo info = jobHistoryFileMap.get(jobId);
+ if (info == null) {
+ return null;
+ }
+ final Path historyFileDir
+ = (new Path(getHistoryFilePath(jobId))).getParent();
+ return getConfFile(historyFileDir, jobId).toString();
+ }
+
+ /**
+ * Get the job name from the job conf
+ */
+ static String getJobName(JobConf jobConf) {
+ String jobName = jobConf.getJobName();
+ if (jobName == null || jobName.length() == 0) {
+ jobName = "NA";
+ }
+ return jobName;
+ }
+
/**
* Create an event writer for the Job represented by the jobID.
* This should be the first call to history for a job
@@ -270,12 +407,19 @@ public class JobHistory {
*/
public void setupEventWriter(JobID jobId, JobConf jobConf)
throws IOException {
- Path logFile = getJobHistoryFile(logDir, jobId, getUserName(jobConf));
-
if (logDir == null) {
LOG.info("Log Directory is null, returning");
throw new IOException("Missing Log Directory for History");
}
+
+ MetaInfo oldFi = fileMap.get(jobId);
+
+ long submitTime = (oldFi == null ? System.currentTimeMillis() : oldFi.submitTime);
+
+ String user = getUserName(jobConf);
+ String jobName = getJobName(jobConf);
+
+ Path logFile = getJobHistoryFile(logDir, jobId);
int defaultBufferSize =
logDirFs.getConf().getInt("io.file.buffer.size", 4096);
@@ -315,7 +459,7 @@ public class JobHistory {
+ StringUtils.stringifyException(e));
}
- MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer);
+ MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName);
fileMap.put(jobId, fi);
}
@@ -360,7 +504,7 @@ public class JobHistory {
}
private void startFileMoverThreads() {
- executor = new ThreadPoolExecutor(1, 3, 1,
+ executor = new ThreadPoolExecutor(3, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
}
@@ -375,14 +519,14 @@ public class JobHistory {
Path jobFilePath = null;
if (logDir != null) {
jobFilePath = new Path(logDir + File.separator +
- jobId.toString() + "_conf.xml");
+ jobId.toString() + CONF_FILE_NAME_SUFFIX);
}
return jobFilePath;
}
/**
* Generates a suffix for old/stale jobhistory files
- * Pattern : . + identifier + .old
+ * Pattern : . + identifier + JobHistory.OLD_SUFFIX
*/
public static String getOldFileSuffix(String identifier) {
return "." + identifier + JobHistory.OLD_SUFFIX;
@@ -394,13 +538,18 @@ public class JobHistory {
//files with same job id don't get over written in case of recovery.
FileStatus[] files = logDirFs.listStatus(logDir);
String fileSuffix = getOldFileSuffix(jobTracker.getTrackerIdentifier());
+ // We use the same millisecond time for all files so the config file
+ // and job history file flow to the same subdirectory
+ long millisecondTime = ueState.monotonicTime();
for (FileStatus fileStatus : files) {
Path fromPath = fileStatus.getPath();
if (fromPath.equals(done)) { //DONE can be a subfolder of log dir
continue;
}
LOG.info("Moving log file from last run: " + fromPath);
- Path toPath = new Path(done, fromPath.getName() + fileSuffix);
+ Path resultDir
+ = canonicalHistoryLogDir(null, millisecondTime);
+ Path toPath = new Path(resultDir, fromPath.getName() + fileSuffix);
try {
moveToDoneNow(fromPath, toPath);
} catch (ChecksumException e) {
@@ -424,15 +573,16 @@ public class JobHistory {
}
}
}
+
private void moveToDone(final JobID id) {
final List<Path> paths = new ArrayList<Path>();
final MetaInfo metaInfo = fileMap.get(id);
- if (metaInfo == null) {
+ if (metaInfo == null || metaInfo.getHistoryFile() == null) {
LOG.info("No file for job-history with " + id + " found in cache!");
return;
}
-
+
final Path historyFile = metaInfo.getHistoryFile();
if (historyFile == null) {
LOG.info("No file for job-history with " + id + " found in cache!");
@@ -448,32 +598,442 @@ public class JobHistory {
}
executor.execute(new Runnable() {
+ static final int SPONTANEOUSLY_CLOSE_INDEX_INTERVAL = 30 * 1000;
+
+ static final int SPONTANEOUS_INTERIM_INDEX_INTERVAL = 300 * 1000;
public void run() {
- //move the files to DONE folder
- try {
- for (Path path : paths) {
- moveToDoneNow(path, new Path(done, path.getName()));
+ boolean iShouldMonitor = false;
+
+ Path resultDir = null;
+
+ String historyFileDonePath = null;
+
+ Path failedJobHistoryIndexBuildPath = null;
+ Throwable failedHistoryMoveException = null;
+
+ synchronized (ueState) {
+ // needed because it's possible for system time to go backward
+ long millisecondTime = ueState.monotonicTime();
+
+ resultDir = canonicalHistoryLogDir(id, millisecondTime);
+
+ if (!resultDir.equals(ueState.currentDoneSubdirectory)) {
+ if (ueState.currentDoneSubdirectory != null) {
+ try {
+ ueState.closeCurrentDirectory();
+ } catch (IOException e) {
+ failedJobHistoryIndexBuildPath = ueState.currentDoneSubdirectory;
+ }
+ }
+
+ iShouldMonitor = true;
+
+ ueState.indexableElements = new LinkedList<JobHistoryIndexElement>();
+ ueState.currentDoneSubdirectory = resultDir;
+
+ ueState.monitoredDirectory = resultDir;
}
- } catch (Throwable e) {
- LOG.error("Unable to move history file to DONE folder.", e);
+
+ // We need to make the JobHistoryIndexElement here, because after
+ // we've copied the file the info might disappear, but before we've
+ // closed the previous subdirectory [if we do that] it would go into
+ // the wrong subdirectory index.
+ ueState.indexableElements.
+ add(new JobHistoryIndexElement(millisecondTime, id, metaInfo));
+
+ //move the files to a DONE canonical subfolder
+ try {
+ for (Path path : paths) {
+ moveToDoneNow(path, new Path(resultDir, path.getName()));
+ }
+ } catch (Throwable e) {
+ failedHistoryMoveException = e;
+ }
+ if (historyFile != null) {
+ historyFileDonePath = new Path(resultDir,
+ historyFile.getName()).toString();
+ }
+ jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath,
+ millisecondTime));
}
- String historyFileDonePath = null;
- if (historyFile != null) {
- historyFileDonePath = new Path(done,
- historyFile.getName()).toString();
+
+ if (failedJobHistoryIndexBuildPath != null) {
+ LOG.warn("Couldn't build a Job History index for "
+ + failedJobHistoryIndexBuildPath);
+ }
+ if (failedHistoryMoveException != null) {
+ LOG.error("Can't move history file to DONE canonical subfolder.",
+ failedHistoryMoveException);
}
- jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath,
- System.currentTimeMillis()));
+
+
jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id),
- historyFileDonePath);
+ historyFileDonePath);
//purge the job from the cache
fileMap.remove(id);
- }
+ // Except ephemerally, only one task will be in this code at a
+ // time, because iShouldMonitor is only set true when
+ // ueState.monitoredDirectory changes, which will force the
+ // current incumbent to abend at the earliest opportunity.
+ while (iShouldMonitor) {
+ int roundCounter = 0;
+
+ int interruptionsToAbort = 2;
+
+ try {
+ Thread.sleep(SPONTANEOUSLY_CLOSE_INDEX_INTERVAL);
+ } catch (InterruptedException e) {
+ if (--interruptionsToAbort == 0) {
+ return;
+ }
+ }
+
+ Path unbuildableJobHistoryIndex = null;
+ Path unbuildableInterimJobHistoryIndex = null;
+
+ synchronized (ueState) {
+ if (ueState.monitoredDirectory != resultDir) {
+ // someone else closed out the directory I was monitoring
+ iShouldMonitor = false;
+ } else {
+ interruptionsToAbort = 2;
+
+ long millisecondTime = ueState.monotonicTime();
+
+ Path newResultDir = canonicalHistoryLogDir(id, millisecondTime);
+
+ if (!newResultDir.equals(resultDir)) {
+ try {
+ ueState.closeCurrentDirectory();
+ } catch (IOException e) {
+ unbuildableJobHistoryIndex = ueState.currentDoneSubdirectory;
+ }
+ iShouldMonitor = false;
+ }
+ }
+
+ if (iShouldMonitor
+ && (++roundCounter
+ % (SPONTANEOUS_INTERIM_INDEX_INTERVAL
+ / SPONTANEOUSLY_CLOSE_INDEX_INTERVAL)
+ == 0)) {
+ // called for side effect -- a 5 minute checkpoint to
+ // reduce possible unindexed jobs on a JT crash
+ try {
+ ueState.getACurrentIndex(ueState.currentDoneSubdirectory);
+ } catch (IOException e) {
+ unbuildableInterimJobHistoryIndex
+ = ueState.currentDoneSubdirectory;
+ }
+ }
+ }
+
+ if (unbuildableJobHistoryIndex != null) {
+ LOG.warn("Couldn't build a Job History index for "
+ + unbuildableJobHistoryIndex);
+ }
+
+ if (unbuildableInterimJobHistoryIndex != null) {
+ LOG.warn("Couldn't build an interim Job History index for "
+ + unbuildableInterimJobHistoryIndex);
+ }
+ }
+ }
});
}
+
+ public String[] currentIndex(Path theDoneSubdirectory)
+ throws IOException {
+ return ueState.currentIndex(theDoneSubdirectory);
+ }
+
+ // we only create one instance per JobHistory
+ class UnindexedElementsState {
+ long monotonicTime = Long.MIN_VALUE;
+ Path currentDoneSubdirectory = null;
+ private List<JobHistoryIndexElement> indexableElements = null;
+ Path monitoredDirectory = null;
+ int indexIndex = 0;
+ int indexedElementCount = 0;
+
+ private void buildIndex(String indexName) throws IOException {
+ Path tempPath = new Path(currentDoneSubdirectory, "nascent-index");
+ Path indexPath = new Path(currentDoneSubdirectory, indexName);
+
+ OutputStream newIndexOStream = null;
+ PrintStream newIndexPStream = null;
+
+ indexedElementCount = indexableElements.size();
+
+ try {
+ newIndexOStream
+ = FileSystem.create(doneDirFs, tempPath, HISTORY_FILE_PERMISSION);
+
+ newIndexPStream = new PrintStream(newIndexOStream);
+
+ for (JobHistoryIndexElement elt : indexableElements) {
+ newIndexPStream.println(elt.toString());
+ }
+ } finally {
+ if (newIndexPStream != null) {
+ newIndexPStream.close();
+
+ if (doneDirFs.exists(tempPath)) {
+ doneDirFs.rename(tempPath, indexPath);
+ }
+ } else if (newIndexOStream != null) {
+ newIndexOStream.close();
+ doneDirFs.delete(tempPath, false);
+ }
+ }
+ }
+
+ synchronized String[] currentIndex(Path theDoneSubdirectory)
+ throws IOException {
+ Path subdirIndex = getACurrentIndex(theDoneSubdirectory);
+
+ List<String> indexAsRead = new ArrayList<String>();
+
+ InputStream iStream = null;
+ InputStreamReader isReader = null;
+ BufferedReader breader = null;
+
+ try {
+ iStream = doneDirFs.open(subdirIndex);
+ isReader = new InputStreamReader(iStream);
+ breader = new BufferedReader(isReader);
+
+ String thisRecord = breader.readLine();
+
+ while (thisRecord != null) {
+ indexAsRead.add(thisRecord);
+ thisRecord = breader.readLine();
+ }
+
+ String[] result = indexAsRead.toArray(new String[0]);
+
+ Arrays.sort(result);
+
+ return result;
+ } finally {
+ if (breader != null) {
+ breader.close();
+ } else if (isReader != null) {
+ isReader.close();
+ } else if (iStream != null) {
+ iStream.close();
+ }
+ }
+ }
+
+ // If this is the block that's now being built, we build a new
+ // index and return that. This shouldn't be called on an empty
+ // subdirectory.
+ //
+ // getACurrentIndex must be called within a synchronized(this) block.
+ // Currently there are two calls, both of which qualify.
+ Path getACurrentIndex(Path theDoneSubdirectory) throws IOException {
+ if (!theDoneSubdirectory.equals(currentDoneSubdirectory)) {
+ return new Path(theDoneSubdirectory, "index");
+ }
+
+ if (indexedElementCount == indexableElements.size()) {
+ return new Path(theDoneSubdirectory, "index-" + indexIndex);
+ }
+
+ String indexName = "index-" + ++indexIndex;
+
+ buildIndex(indexName);
+
+ return new Path(theDoneSubdirectory, indexName);
+ }
+
+ // not synchronized, because calls must be in a larger synchronized context
+ private void closeCurrentDirectory() throws IOException {
+ if (currentDoneSubdirectory == null) {
+ return;
+ }
+
+ buildIndex("index");
+ }
+
+ synchronized long monotonicTime() {
+ monotonicTime = Math.max(monotonicTime, System.currentTimeMillis());
+ return monotonicTime;
+ }
+ }
+
+ static class JobHistoryIndexElement {
+ // id and millisecondTime are currently unused.
+ final JobID id;
+ final long millisecondTime;
+ final MetaInfo metaInfo;
+
+ JobHistoryIndexElement(long millisecondTime, JobID id, MetaInfo metaInfo) {
+ this.id = id;
+ this.millisecondTime = millisecondTime;
+ this.metaInfo = metaInfo;
+ }
+
+ public String toString() {
+ String user = metaInfo.user;
+ String jobName = metaInfo.jobName;
+
+ if (jobName.length() > 50) {
+ jobName = jobName.substring(0, 50);
+ }
+
+ String adHocBarEscape = "";
+
+ if (user.indexOf('|') >= 0 || jobName.indexOf('|') >= 0) {
+ adHocBarEscape = nonOccursString(user + jobName);
+
+ user = replaceStringInstances(user, "|", adHocBarEscape);
+ jobName = replaceStringInstances(jobName, "|", adHocBarEscape);
+ }
+
+ return (metaInfo.getHistoryFile().getName()
+ + "|" + millisecondTime
+ + "|" + adHocBarEscape
+ + "|" + user
+ + "|" + jobName);
+ }
+ }
+
+ // several methods for manipulating the subdirectories of the DONE
+ // directory
+
+ // directory components may contain internal slashes, but do NOT
+ // contain slashes at either end.
+
+ // In this nest of code, id can be null. In that case it is an error to call
+ // more than once to get a single filename. This can happen when we're moving
+ // files from an old run into the new context. See moveOldFiles() .
+ private static String timestampDirectoryComponent(JobID id, long millisecondTime) {
+ Integer boxedSerialNumber = null;
+
+ if (id != null) {
+ boxedSerialNumber = id.getId();
+ }
+
+ // don't want to do this inside the lock
+ Calendar timestamp = Calendar.getInstance();
+ timestamp.setTimeInMillis(millisecondTime);
+
+ synchronized (idToDateString) {
+ String dateString
+ = (boxedSerialNumber == null ? null : idToDateString.get(boxedSerialNumber));
+
+ if (dateString == null) {
+
+ dateString = String.format
+ ("%04d/%02d/%02d/%02d",
+ timestamp.get(Calendar.YEAR),
+ timestamp.get(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH) + 1,
+ timestamp.get(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH),
+ timestamp.get(DEBUG_MODE ? Calendar.SECOND : Calendar.HOUR));
+
+ dateString = dateString.intern();
+
+ if (boxedSerialNumber != null) {
+ idToDateString.put(boxedSerialNumber, dateString);
+
+ if (idToDateString.size() > MAXIMUM_DATESTRING_COUNT) {
+ idToDateString.remove(idToDateString.firstKey());
+ }
+ }
+ }
+
+ return dateString;
+ }
+ }
+
+ // returns false iff the directory already existed
+ private boolean maybeMakeSubdirectory(JobID id, long millisecondTime)
+ throws IOException {
+ Path dir = canonicalHistoryLogDir(id, millisecondTime);
+
+ String deferredErrorPrintout = null;
+ String deferredInfoLogging = null;
+
+ try {
+ synchronized (existingDoneSubdirs) {
+ if (existingDoneSubdirs.contains(dir)) {
+ if (DEBUG_MODE && !doneDirFs.exists(dir)) {
+ deferredErrorPrintout
+ = ("JobHistory.maybeMakeSubdirectory -- We believed "
+ + dir + " already existed, but it didn't.");
+ }
+
+ return true;
+ }
+
+ if (!doneDirFs.exists(dir)) {
+ deferredInfoLogging = "Creating DONE subfolder at " + dir;
+
+ if (!doneDirFs.mkdirs(dir,
+ new FsPermission(HISTORY_DIR_PERMISSION))) {
+ throw new IOException("Mkdirs failed to create " + dir.toString());
+ }
+
+ existingDoneSubdirs.add(dir);
+
+ return false;
+ } else {
+ if (DEBUG_MODE) {
+ deferredErrorPrintout
+ = ("JobHistory.maybeMakeSubdirectory -- We believed "
+ + dir + " didn't already exist, but it did.");
+ }
+
+ return false;
+ }
+ }
+ } finally {
+ if (deferredErrorPrintout != null) {
+ System.err.println(deferredErrorPrintout);
+ }
+
+ if (deferredInfoLogging != null) {
+ LOG.info(deferredInfoLogging);
+ }
+ }
+ }
+
+ // Previous versions of this code used the id argument, when the
+ // directory structure was a bit different.
+ // I'm leaving the currently unused id argument in place, in case we
+ // decide to start using it again in the future.
+ private Path canonicalHistoryLogDir(JobID id, long millisecondTime) {
+ return new Path(done, historyLogSubdirectory(id, millisecondTime));
+ }
+
+ private String historyLogSubdirectory(JobID id, long millisecondTime) {
+ String result = LOG_VERSION_STRING
+ + "/" + jobtrackerDirectoryComponent(id);
+
+ result = (result
+ + "/" + timestampDirectoryComponent(id, millisecondTime)
+ + "/")
+ .intern();
+
+ return result;
+ }
+
+ private String jobtrackerDirectoryComponent(JobID id) {
+ return jobTrackerUniqueName;
+ }
+
+ private static String doneSubdirsBeforeSerialTail() {
+ // job tracker ID + date
+ String result = "/*/*/*/*/*"; // job tracker instance ID/YYYY/MM/DD/HH
+
+ return result;
+ }
+
private String getUserName(JobConf jobConf) {
String user = jobConf.getUser();
@@ -483,15 +1043,141 @@ public class JobHistory {
return user;
}
+
+ // hasMismatches is just used to return a second value if you want
+ // one. I would have used MutableBoxedBoolean if such had been provided.
+ static Path[] filteredStat2Paths
+ (FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches) {
+ int resultCount = 0;
+
+ if (hasMismatches == null) {
+ hasMismatches = new AtomicBoolean(false);
+ }
+
+ for (int i = 0; i < stats.length; ++i) {
+ if (stats[i].isDir() == dirs) {
+ stats[resultCount++] = stats[i];
+ } else {
+ hasMismatches.set(true);
+ }
+ }
+
+ Path[] paddedResult = FileUtil.stat2Paths(stats);
+
+ Path[] result = new Path[resultCount];
+
+ System.arraycopy(paddedResult, 0, result, 0, resultCount);
+
+ return result;
+ }
+
+ public FileStatus[] getAllHistoryConfFiles() throws IOException {
+ return localGlobber
+ (doneDirFs, done, "/" + LOG_VERSION_STRING + "/*/*/*/*/*");
+ }
+
+ public static FileStatus[] localGlobber
+ (FileSystem fs, Path root, String tail)
+ throws IOException {
+ return localGlobber(fs, root, tail, null);
+ }
+
+ public static FileStatus[] localGlobber
+ (FileSystem fs, Path root, String tail, PathFilter filter)
+ throws IOException {
+ return localGlobber(fs, root, tail, filter, null);
+ }
+
+ private static FileStatus[] nullToEmpty(FileStatus[] result) {
+ return result == null ? new FileStatus[0] : result;
+ }
+
+ private static FileStatus[] listFilteredStatus
+ (FileSystem fs, Path root, PathFilter filter)
+ throws IOException {
+ return filter == null ? fs.listStatus(root) : fs.listStatus(root, filter);
+ }
+
+ // hasMismatches is just used to return a second value if you want
+ // one. I would have used MutableBoxedBoolean if such had been provided.
+ static FileStatus[] localGlobber
+ (FileSystem fs, Path root, String tail, PathFilter filter,
+ AtomicBoolean hasFlatFiles)
+ throws IOException {
+ if (tail.equals("")) {
+ return nullToEmpty(listFilteredStatus(fs, root, filter));
+ }
+
+ if (tail.startsWith("/*")) {
+ Path[] subdirs = filteredStat2Paths(nullToEmpty(fs.listStatus(root)),
+ true, hasFlatFiles);
+
+ FileStatus[][] subsubdirs = new FileStatus[subdirs.length][];
+
+ int subsubdirCount = 0;
+
+ if (subsubdirs.length == 0) {
+ return new FileStatus[0];
+ }
+
+ String newTail = tail.substring(2);
+
+ for (int i = 0; i < subdirs.length; ++i) {
+ subsubdirs[i] = localGlobber(fs, subdirs[i], newTail, filter, null);
+ subsubdirCount += subsubdirs[i].length;
+ }
+
+ FileStatus[] result = new FileStatus[subsubdirCount];
+
+ int segmentStart = 0;
+
+ for (int i = 0; i < subsubdirs.length; ++i) {
+ System.arraycopy(subsubdirs[i], 0, result, segmentStart, subsubdirs[i].length);
+ segmentStart += subsubdirs[i].length;
+ }
+
+ return result;
+ }
+
+ if (tail.startsWith("/")) {
+ int split = tail.indexOf('/', 1);
+
+ try {
+ if (split < 0) {
+ return nullToEmpty
+ (listFilteredStatus(fs, new Path(root, tail.substring(1)), filter));
+ } else {
+ String thisSegment = tail.substring(1, split);
+ String newTail = tail.substring(split);
+ return localGlobber
+ (fs, new Path(root, thisSegment), newTail, filter, hasFlatFiles);
+ }
+ } catch (FileNotFoundException ignored) {
+ return new FileStatus[0];
+ }
+ }
+
+ IOException e = new IOException("localGlobber: bad tail");
+
+ throw e;
+ }
+
private static class MetaInfo {
private Path historyFile;
private Path confFile;
private EventWriter writer;
+ long submitTime;
+ String user;
+ String jobName;
- MetaInfo(Path historyFile, Path conf, EventWriter writer) {
+ MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
+ String user, String jobName) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
+ this.submitTime = submitTime;
+ this.user = user;
+ this.jobName = jobName;
}
Path getHistoryFile() { return historyFile; }
@@ -512,18 +1198,315 @@ public class JobHistory {
}
/**
+ * Returns a job history log Path generator
+ * We return all Path's that exist in the history at time of call, subject to
+ * four conjunctive criteria, one for each of parameter. Each parameter is
+ * null or the empty string if caller doesn't want that filtering.
+ * It's perfectly alright to call this with no restrictions.
+ *
+ * @param user the desired username
+ * [or null or the empty string, if no specific user]
+ * @param jobnameSubstring a substring of the job names
+ * @param dateStrings an array of date strings, format MM/DD/YYYY . This
+ * criterion accepts logs with ANY of the dates
+ * @param soughtJobid the String naming the jobID we want, if any. Note
+ * that this criterion names a unique job; you may not want
+ * to specify any other criteria if you specify this one.
+ * @throws IOException
+ */
+ public JobHistoryRecordRetriever getMatchingJobs
+ (String user, String jobnameSubstring,
+ String[] dateStrings, String soughtJobid)
+ throws IOException {
+ return new JobHistoryRecordRetriever
+ (user, jobnameSubstring, dateStrings, soughtJobid);
+ }
+
+ public static class JobHistoryJobRecord {
+ private Path basePath;
+ private String recordText;
+ private String[] recordSplits = null;
+
+ JobHistoryJobRecord(Path basePath, String recordText) {
+ this.basePath = basePath;
+ this.recordText = recordText;
+ }
+
+ private String[] getSplits(boolean noCache) {
+ if (recordSplits != null) {
+ return recordSplits;
+ }
+
+ String[] result = recordText.split("\\|");
+
+ if (!noCache) {
+ recordSplits = result;
+ }
+
+ return result;
+ }
+
+ public Path getPath() {
+ return getPath(false);
+ }
+
+ public Path getPath(boolean noCache) {
+ return new Path(basePath, getSplits(noCache)[0]);
+ }
+
+ public String getJobIDString() {
+ return getJobIDString(false);
+ }
+
+ public String getJobIDString(boolean noCache) {
+ return getSplits(noCache)[0];
+ }
+
+ public long getSubmitTime() {
+ return getSubmitTime(false);
+ }
+
+ public long getSubmitTime(boolean noCache) {
+ return Long.parseLong(getSplits(noCache)[1]);
+ }
+
+ public String getUserName() {
+ return getUserName(false);
+ }
+
+ public String getUserName(boolean noCache) {
+ String[] splits = getSplits(noCache);
+
+ String result = splits[3];
+
+ if (splits[2].length() != 0) {
+ result = replaceStringInstances(result, splits[2], "|");
+ }
+
+ return result;
+ }
+
+ public String getJobName() {
+ return getJobName(false);
+ }
+
+ public String getJobName(boolean noCache) {
+ String[] splits = getSplits(noCache);
+
+ String result = splits[4];
+
+ if (splits[2].length() != 0) {
+ result = replaceStringInstances(result, splits[2], "|");
+ }
+
+ return result;
+ }
+ }
+
+ public class JobHistoryRecordRetriever implements Iterator<JobHistoryJobRecord> {
+ private final Pattern DATE_PATTERN
+ = Pattern.compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])");
+
+
+ private class BaseElements {
+ Path basePath;
+ List<String> records = new LinkedList<String>();
+ }
+
+ // Internal contract -- elts contains no empty BaseElements's
+ private List<BaseElements> elts = new LinkedList<BaseElements>();
+
+ private Iterator<BaseElements> eltsCursor;
+ private Iterator<String> currentEltCursor;
+
+ private BaseElements currentBE = null;
+
+ private int numberMatches;
+
+ @Override
+ public boolean hasNext() {
+ return currentEltCursor.hasNext() || eltsCursor.hasNext();
+ }
+
+ @Override
+ public JobHistoryJobRecord next() throws NoSuchElementException {
+ if (currentEltCursor.hasNext()) {
+ return new JobHistoryJobRecord(currentBE.basePath, currentEltCursor.next());
+ }
+
+ currentBE = eltsCursor.next();
+ currentEltCursor = currentBE.records.iterator();
+ return next();
+ }
+
+ @Override
+ public void remove() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("no remove() operation");
+ }
+
+ public int numberMatches() {
+ return numberMatches;
+ }
+
+ public JobHistoryRecordRetriever
+ (String soughtUser, String soughtJobName, String[] dateStrings, String soughtJobid)
+ throws IOException {
+ numberMatches = 0;
+
+ soughtUser = soughtUser == null ? "" : soughtUser;
+ soughtJobName = soughtJobName == null ? "" : soughtJobName;
+ soughtJobid = soughtJobid == null ? "" : soughtJobid;
+
+ if (dateStrings == null || dateStrings.length == 0) {
+ dateStrings = new String[1];
+ dateStrings[0] = "";
+ }
+
+ for (int i = 0; i < dateStrings.length; ++i) {
+ String soughtDate = dateStrings[i];
+ String globString = globString();
+
+
+ String yyyyGlobPart = "*";
+ String mmGlobPart = "*";
+ String ddGlobPart = "*";
+ String hhGlobPart = "*";
+
+ if (soughtDate.length() != 0) {
+ Matcher m = DATE_PATTERN.matcher(soughtDate);
+ if (m.matches()) {
+ yyyyGlobPart = m.group(3);
+ mmGlobPart = m.group(1);
+ ddGlobPart = m.group(2);
+
+ if (yyyyGlobPart.length() == 2) {
+ yyyyGlobPart = "20" + yyyyGlobPart;
+ }
+
+ if (mmGlobPart.length() == 1) {
+ mmGlobPart = "0" + mmGlobPart;
+ }
+
+ if (ddGlobPart.length() == 1) {
+ ddGlobPart = "0" + ddGlobPart;
+ }
+ }
+ }
+
+ globString = globString.replace("YYYY", yyyyGlobPart);
+ globString = globString.replace("MM", mmGlobPart);
+ globString = globString.replace("DD", ddGlobPart);
+ globString = globString.replace("HH", hhGlobPart);
+
+ if (doneDirFs == null) {
+ if (DEBUG_MODE) {
+ System.out.println("Null file system. May be namenode is in safemode!");
+ }
+ return;
+ }
+
+ Path[] jobDirectories
+ = FileUtil.stat2Paths
+ (JobHistory.localGlobber
+ (doneDirFs, done, globString));
+
+ for (int jd = 0; jd < jobDirectories.length; jd++) {
+ Path doneSubdirectory = jobDirectories[jd];
+
+ String[] subdirectoryIndex = new String[0];
+
+ BaseElements be = new BaseElements();
+
+ be.basePath = doneSubdirectory;
+
+ try {
+ subdirectoryIndex = currentIndex(doneSubdirectory);
+ } catch (FileNotFoundException e) {
+ // no code -- should we log here?
+ }
+
+ for (int j = 0; j < subdirectoryIndex.length; ++j) {
+ String[] segments = subdirectoryIndex[j].split("\\|");
+
+ // segments are [0] jobid [also file name]
+ // [1] submit time [milliseconds]
+ // [2] ad hoc '|' substitution
+ // [3] user name
+ // [4] trimmed job jame
+ //
+
+ String user = segments[3];
+ String jobName = segments[4];
+
+ if (segments[2].length() > 0) {
+ user = replaceStringInstances(user, segments[2], "|");
+ jobName = replaceStringInstances(jobName, segments[2], "|");
+ }
+
+ if ((soughtJobid.equals("") || segments[0].equalsIgnoreCase(soughtJobid))
+ && (soughtUser.equals("") || user.equalsIgnoreCase(soughtUser))
+ && (soughtJobName.equals("") || jobName.contains(soughtJobName))) {
+ be.records.add(subdirectoryIndex[j]);
+ }
+ }
+
+ if (be.records.size() != 0) {
+ elts.add(be);
+
+ numberMatches += be.records.size();
+ }
+ }
+ }
+
+ eltsCursor = elts.iterator();
+
+ currentEltCursor = new LinkedList<String>().iterator();
+ }
+ }
+
+ static long directoryTime(String year, String seg2, String seg3, String seg4) {
+ // set to current time. In debug mode, this is where the month
+ // and day get set.
+ Calendar result = Calendar.getInstance();
+ // canonicalize by filling in unset fields
+ result.setTimeInMillis(System.currentTimeMillis());
+
+ result.set(Calendar.YEAR, Integer.parseInt(year));
+
+ int seg2int = Integer.parseInt(seg2);
+ if (!DEBUG_MODE) {
+ --seg2int;
+ }
+
+ result.set(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH,
+ seg2int);
+ result.set(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH,
+ Integer.parseInt(seg3));
+ result.set(DEBUG_MODE ? Calendar.SECOND : Calendar.HOUR,
+ Integer.parseInt(seg4));
+
+ return result.getTimeInMillis();
+ }
+
+ /**
* Delete history files older than a specified time duration.
*/
class HistoryCleaner extends Thread {
static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
+
+ static final long DIRECTORY_LIFE_IN_MS
+ = DEBUG_MODE ? 20 * 60 * 1000L : 30 * ONE_DAY_IN_MS;
+ static final long RUN_INTERVAL
+ = DEBUG_MODE ? 10L * 60L * 1000L : ONE_DAY_IN_MS;
+
private long cleanupFrequency;
private long maxAgeOfHistoryFiles;
-
+
public HistoryCleaner(long maxAge) {
setName("Thread for cleaning up History files");
setDaemon(true);
this.maxAgeOfHistoryFiles = maxAge;
- cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles);
+ cleanupFrequency = Math.min(RUN_INTERVAL, maxAgeOfHistoryFiles);
LOG.info("Job History Cleaner Thread started." +
" MaxAge is " +
maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," +
@@ -551,29 +1534,93 @@ public class JobHistory {
}
private void doCleanup() {
- long now = System.currentTimeMillis();
+ long now = ueState.monotonicTime();
+
+ boolean printedOneDeletee = false;
+
+ Set<String> deletedPathnames = new HashSet<String>();
+
try {
- FileStatus[] historyFiles = doneDirFs.listStatus(done);
- if (historyFiles != null) {
- for (FileStatus f : historyFiles) {
- if (now - f.getModificationTime() > maxAgeOfHistoryFiles) {
- doneDirFs.delete(f.getPath(), true);
- LOG.info("Deleting old history file : " + f.getPath());
+ Path[] datedDirectories
+ = FileUtil.stat2Paths(localGlobber(doneDirFs, done,
+ DONE_BEFORE_SERIAL_TAIL, null));
+
+ // fild old directories
+ for (int i = 0; i < datedDirectories.length; ++i) {
+ String thisDir = datedDirectories[i].toString();
+ Matcher pathMatcher = historyCleanerParseDirectory.matcher(thisDir);
+
+ if (pathMatcher.matches()) {
+ long dirTime = directoryTime(pathMatcher.group(1),
+ pathMatcher.group(2),
+ pathMatcher.group(3),
+ pathMatcher.group(4));
+
+ if (DEBUG_MODE) {
+ System.err.println("HistoryCleaner.run just parsed " + thisDir
+ + " as year/month/day/hour = "
+ + pathMatcher.group(1)
+ + "/" + pathMatcher.group(2)
+ + "/" + pathMatcher.group(3)
+ + "/" + pathMatcher.group(4));
+ }
+
+ if (dirTime < now - DIRECTORY_LIFE_IN_MS) {
+ if (DEBUG_MODE) {
+ Calendar then = Calendar.getInstance();
+ then.setTimeInMillis(dirTime);
+ Calendar nnow = Calendar.getInstance();
+ nnow.setTimeInMillis(now);
+
+ System.err.println("HistoryCleaner.run directory: " + thisDir
+ + " because its time is " + then
+ + " but it's now " + nnow);
+ System.err.println("then = " + dirTime);
+ System.err.println("now = " + now);
+ }
+
+ // remove every file in the directory and save the name
+ // so we can remove it from jobHistoryFileMap
+ Path[] deletees
+ = FileUtil.stat2Paths(localGlobber(doneDirFs,
+ datedDirectories[i],
+ "/*", // individual files
+ null));
+
+ for (int j = 0; j < deletees.length; ++j) {
+
+ if (DEBUG_MODE && !printedOneDeletee) {
+ System.err.println("HistoryCleaner.run deletee: " + deletees[j].toString());
+ printedOneDeletee = true;
+ }
+
+ LOG.info("Deleting old history file : " + deletees[j]);
+
+ doneDirFs.delete(deletees[j]);
+ deletedPathnames.add(deletees[j].toString());
+ }
+
+ synchronized (existingDoneSubdirs) {
+ if (!existingDoneSubdirs.contains(datedDirectories[i]))
+ {
+ LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
+ + datedDirectories[i] + ", but should.");
+ }
+ doneDirFs.delete(datedDirectories[i], true);
+ existingDoneSubdirs.remove(datedDirectories[i]);
+ }
}
}
}
+
//walking over the map to purge entries from jobHistoryFileMap
synchronized (jobHistoryFileMap) {
Iterator<Entry<JobID, MovedFileInfo>> it =
jobHistoryFileMap.entrySet().iterator();
while (it.hasNext()) {
MovedFileInfo info = it.next().getValue();
- if (now - info.timestamp > maxAgeOfHistoryFiles) {
+ if (deletedPathnames.contains(info.historyFile)) {
it.remove();
- } else {
- //since entries are in sorted timestamp order, no more entries
- //are required to be checked
- break;
}
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Tue Mar 8 05:53:05 2011
@@ -251,10 +251,15 @@ public class TestJobHistory extends Test
*/
private static Path getPathForConf(Path path, Path dir) {
String parts[] = path.getName().split("_");
- //TODO this is a hack :(
- // jobtracker-hostname_jobtracker-identifier_
+ Path parent = path.getParent();
+ Path ancestor = parent;
+ for (int i = 0; i < 4; ++i) { // serial #, 3 laysers of date
+ ancestor = ancestor.getParent();
+ }
+ String jobtrackerID = ancestor.getName();
String id = parts[0] + "_" + parts[1] + "_" + parts[2];
- return new Path(dir, id + "_conf.xml");
+ String jobUniqueString = jobtrackerID + id;
+ return new Path(parent, jobUniqueString + "_conf.xml");
}
/**
@@ -279,13 +284,14 @@ public class TestJobHistory extends Test
* @param id job id
* @param conf job conf
*/
- public static void validateJobHistoryFileFormat(JobHistory jobHistory,
- JobID id, JobConf conf,
- String status, boolean splitsCanBeEmpty) throws IOException {
+ public static void validateJobHistoryFileFormat
+ (JobTracker jt, JobHistory jobHistory, JobID id, JobConf conf,
+ String status, boolean splitsCanBeEmpty)
+ throws IOException {
// Get the history file name
Path dir = jobHistory.getCompletedJobHistoryLocation();
- String logFileName = getDoneFile(jobHistory, conf, id, dir);
+ String logFileName = getDoneFile(jt, conf, id, dir);
// Framework history log file location
Path logFile = new Path(dir, logFileName);
@@ -565,11 +571,12 @@ public class TestJobHistory extends Test
RunningJob job, JobConf conf) throws IOException {
JobID id = job.getID();
- JobHistory jobHistory =
- mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ JobHistory jobHistory = jt.getJobHistory();
+
Path doneDir = jobHistory.getCompletedJobHistoryLocation();
// Get the history file name
- String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+ String logFileName = getDoneFile(jt, conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
@@ -646,12 +653,14 @@ public class TestJobHistory extends Test
assertEquals("Files in logDir did not move to DONE folder",
0, logDirFs.listStatus(logDirPath).length);
- JobHistory jobHistory =
- mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ JobHistory jobHistory = jt.getJobHistory();
+
Path doneDir = jobHistory.getCompletedJobHistoryLocation();
- assertEquals("Files in DONE dir not correct",
- 2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
+ FileStatus[] movedFiles = jobHistory.getAllHistoryConfFiles();
+
+ assertEquals("Files in DONE dir not correct", 2, movedFiles.length);
// run the TCs
conf = mr.createJobConf();
@@ -676,31 +685,28 @@ public class TestJobHistory extends Test
assertEquals("History DONE folder not correct",
doneFolder, doneDir.getName());
JobID id = job.getID();
- String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+ String logFileName = getDoneFile(jt, conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
-
- Cluster cluster = new Cluster(conf);
- assertEquals("Client returned wrong history url", logFile.toString(),
- cluster.getJobHistoryUrl(id));
// Check if the history file exists
assertTrue("History file does not exist", fileSys.exists(logFile));
// check if the corresponding conf file exists
- Path confFile = getPathForConf(logFile, doneDir);
+ String confname = jobHistory.getConfFilePath(id);
+ Path confFile = new Path(confname);
assertTrue("Config for completed jobs doesnt exist",
fileSys.exists(confFile));
- // check if the file exists in a done folder
- assertTrue("Completed job config doesnt exist in the done folder",
- doneDir.getName().equals(confFile.getParent().getName()));
-
- // check if the file exists in a done folder
- assertTrue("Completed jobs doesnt exist in the done folder",
- doneDir.getName().equals(logFile.getParent().getName()));
+ // check if the conf and log files are in the same directory
+ assertTrue("config file and log file aren't in the same directory",
+ confFile.getParent().equals(logFile.getParent()));
+
+ // check if the file exists under the done folder
+ assertTrue("Completed job doesnt exist under done folder",
+ logFile.toString().startsWith(doneDir.toString()));
// check if the job file is removed from the history location
@@ -714,7 +720,7 @@ public class TestJobHistory extends Test
assertFalse("Config for completed jobs not deleted from running folder",
fileSys.exists(runningJobConfFilename));
- validateJobHistoryFileFormat(jobHistory,
+ validateJobHistoryFileFormat(jt, jobHistory,
job.getID(), conf, "SUCCEEDED", false);
validateJobHistoryFileContent(mr, job, conf);
@@ -771,33 +777,39 @@ public class TestJobHistory extends Test
// Run a job that will be succeeded and validate its history file
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
- JobHistory jobHistory =
- mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ JobHistory jobHistory = jt.getJobHistory();
+
Path doneDir = jobHistory.getCompletedJobHistoryLocation();
assertEquals("History DONE folder not correct",
doneFolder, doneDir.toString());
JobID id = job.getID();
- String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+ String logFileName = getDoneFile(jt, conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
+ Path logDir = logFile.getParent();
FileSystem fileSys = logFile.getFileSystem(conf);
// Check if the history file exists
assertTrue("History file does not exist", fileSys.exists(logFile));
// check if the corresponding conf file exists
- Path confFile = getPathForConf(logFile, doneDir);
+ Path confFile = new Path(jobHistory.getConfFilePath(id));
assertTrue("Config for completed jobs doesnt exist",
fileSys.exists(confFile));
// check if the conf file exists in a done folder
assertTrue("Completed job config doesnt exist in the done folder",
- doneDir.getName().equals(confFile.getParent().getName()));
+ logDir.getName().equals(confFile.getParent().getName()));
// check if the file exists in a done folder
assertTrue("Completed jobs doesnt exist in the done folder",
- doneDir.getName().equals(logFile.getParent().getName()));
+ logDir.getName().equals(logFile.getParent().getName()));
+
+ assertTrue("The log file dir is not under the done dir",
+ logDir.toString().startsWith(doneDir.toString()));
// check if the job file is removed from the history location
Path runningJobsHistoryFolder = logFile.getParent().getParent();
@@ -810,12 +822,11 @@ public class TestJobHistory extends Test
assertFalse("Config for completed jobs not deleted from running folder",
fileSys.exists(runningJobConfFilename));
- validateJobHistoryFileFormat(jobHistory, job.getID(), conf,
+ validateJobHistoryFileFormat(jt, jobHistory, job.getID(), conf,
"SUCCEEDED", false);
validateJobHistoryFileContent(mr, job, conf);
// get the job conf filename
- JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
String name = jt.getLocalJobFilePath(job.getID());
File file = new File(name);
@@ -834,16 +845,22 @@ public class TestJobHistory extends Test
//Returns the file in the done folder
//Waits for sometime to get the file moved to done
- private static String getDoneFile(JobHistory jobHistory,
- JobConf conf, JobID id,
- Path doneDir) throws IOException {
+ private static String getDoneFile
+ (JobTracker jt, JobConf conf, JobID id, Path doneDir)
+ throws IOException {
+ JobHistory jobHistory = jt.getJobHistory();
+
String name = null;
String user = UserGroupInformation.getCurrentUser().getUserName();
+
for (int i = 0; name == null && i < 20; i++) {
- Path path = JobHistory.getJobHistoryFile(
- jobHistory.getCompletedJobHistoryLocation(), id, user);
- if (path.getFileSystem(conf).exists(path)) {
- name = path.toString();
+ String pathname = jobHistory.getHistoryFilePath(id);
+
+ if (pathname != null) {
+ Path path = new Path(pathname);
+ if (path.getFileSystem(conf).exists(path)) {
+ name = path.toString();
+ }
}
UtilsForTests.waitFor(1000);
}
@@ -870,12 +887,14 @@ public class TestJobHistory extends Test
* @param id job id
* @param conf job conf
*/
- private static void validateJobHistoryJobStatus(JobHistory jobHistory,
- JobID id, JobConf conf, String status) throws IOException {
+ private static void validateJobHistoryJobStatus
+ (JobTracker jt, JobHistory jobHistory,
+ JobID id, JobConf conf, String status)
+ throws IOException {
// Get the history file name
Path doneDir = jobHistory.getCompletedJobHistoryLocation();
- String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+ String logFileName = getDoneFile(jt, conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
@@ -891,8 +910,7 @@ public class TestJobHistory extends Test
JobHistoryParser parser = new JobHistoryParser(fileSys,
logFile.toUri().getPath());
- JobHistoryParser.JobInfo jobInfo = parser.parse();
-
+ JobHistoryParser.JobInfo jobInfo = parser.parse();
assertTrue("Job Status read from job history file is not the expected" +
" status", status.equals(jobInfo.getJobStatus()));
@@ -918,22 +936,24 @@ public class TestJobHistory extends Test
// Run a job that will be succeeded and validate its job status
// existing in history file
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
- JobHistory jobHistory =
- mr.getJobTrackerRunner().getJobTracker().getJobHistory();
- validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+ JobHistory jobHistory = jt.getJobHistory();
+
+ validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf,
JobStatus.getJobRunState(JobStatus.SUCCEEDED));
// Run a job that will be failed and validate its job status
// existing in history file
job = UtilsForTests.runJobFail(conf, inDir, outDir);
- validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+ validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf,
JobStatus.getJobRunState(JobStatus.FAILED));
// Run a job that will be killed and validate its job status
// existing in history file
job = UtilsForTests.runJobKill(conf, inDir, outDir);
- validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+ validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf,
JobStatus.getJobRunState(JobStatus.KILLED));
} finally {
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java Tue Mar 8 05:53:05 2011
@@ -109,7 +109,7 @@ public class TestJobHistoryParsing exte
assertFalse("Writing an event after closing event writer is not handled",
caughtException);
- String historyFileName = jobId.toString() + "_" + username;
+ String historyFileName = jobId.toString();
Path historyFilePath = new Path (historyDir.toString(),
historyFileName);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Tue Mar 8 05:53:05 2011
@@ -295,11 +295,11 @@ public class TestSeveral extends TestCas
public Void run() throws IOException {
verifyOutput(outDir.getFileSystem(conf), outDir);
+ JobTracker jt = mrCluster.getJobTrackerRunner().getJobTracker();
//TestJobHistory
- TestJobHistory.validateJobHistoryFileFormat(
- mrCluster.getJobTrackerRunner().getJobTracker().getJobHistory(),
- jobId, conf, "SUCCEEDED", false);
+ TestJobHistory.validateJobHistoryFileFormat
+ (jt, jt.getJobHistory(), jobId, conf, "SUCCEEDED", false);
TestJobHistory.validateJobHistoryFileContent(mrCluster, job, conf);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Mar 8 05:53:05 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryJobRecord;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
@@ -266,7 +269,7 @@ public class TestRumenJobTraces {
.makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
// Check if jobhistory filename are detected properly
- Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid, user);
+ Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid);
JobID extractedJID =
JobID.forName(TraceBuilder.extractJobID(jhFilename.getName()));
assertEquals("TraceBuilder failed to parse the current JH filename",
@@ -366,6 +369,9 @@ public class TestRumenJobTraces {
conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null,
new JobConf(conf));
+ JobTracker tracker = mrCluster.getJobTrackerRunner().getJobTracker();
+ JobHistory history = tracker.getJobHistory();
+
// run a job
Path inDir = new Path(tempDir, "input");
@@ -395,16 +401,20 @@ public class TestRumenJobTraces {
Path jhPath =
new Path(mrCluster.getJobTrackerRunner().getJobTracker()
.getJobHistoryDir());
- Path inputPath = JobHistory.getJobHistoryFile(jhPath, id, user);
+ Path inputPath = null;
// wait for 10 secs for the jobhistory file to move into the done folder
for (int i = 0; i < 100; ++i) {
- if (lfs.exists(inputPath)) {
+ JobHistoryRecordRetriever retriever
+ = history.getMatchingJobs(null, "", null, id.toString());
+ if (retriever.hasNext()) {
+ inputPath = retriever.next().getPath();
break;
}
- TimeUnit.MILLISECONDS.wait(100);
+ TimeUnit.MILLISECONDS.sleep(100);
}
- assertTrue("Missing job history file", lfs.exists(inputPath));
+ assertTrue("Missing job history file",
+ inputPath != null && lfs.exists(inputPath));
ris = getRewindableInputStream(inputPath, conf);
Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Tue Mar 8 05:53:05 2011
@@ -172,13 +172,11 @@ public class TraceBuilder extends Config
* [especially for .crc files] we return null.
*/
static String extractJobID(String fileName) {
+ String pre21JobID
+ = applyParser(fileName,
+ Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX);
String jobId = applyParser(fileName, JobHistory.JOBHISTORY_FILENAME_REGEX);
- if (jobId == null) {
- // check if its a pre21 jobhistory file
- jobId = applyParser(fileName,
- Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX);
- }
- return jobId;
+ return jobId == null ? pre21JobID : jobId;
}
static boolean isJobConfXml(String fileName, InputStream input) {
Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp Tue Mar 8 05:53:05 2011
@@ -21,6 +21,8 @@
contentType="text/html; charset=UTF-8"
import="java.io.*"
import="java.util.*"
+ import="java.util.regex.Matcher"
+ import="java.util.regex.Pattern"
import="java.net.URLEncoder"
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.util.*"
@@ -30,6 +32,8 @@
import="org.apache.hadoop.http.HtmlQuoting"
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.mapreduce.jobhistory.*"
+ import="org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryJobRecord"
+ import="org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever"
%>
<%! private static final long serialVersionUID = 1L;
@@ -70,55 +74,68 @@ window.location.href = url;
<a href="jobhistory.jsp">History Viewer</a></h1>
<hr>
<%
+ //{{ // this is here to make indentation work, and must be commented out
final String search = (request.getParameter("search") == null)
? ""
: request.getParameter("search");
- String parts[] = search.split(":");
+ String soughtDate = "";
+ String soughtJobName = "";
+ String soughtJobid = "";
- final String user = (parts.length >= 1)
- ? parts[0].toLowerCase()
+ // soughtUser : jobid ; jobname ! date
+
+ String splitDate[] = search.split("!");
+
+ final String DATE_PATTERN = "([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])?[0-9][0-9])";
+
+ if (splitDate.length >= 2) {
+ soughtDate = splitDate[1];
+ }
+
+ String[] splitJobName = splitDate[0].split(";");
+
+ if (splitJobName.length >= 2) {
+ soughtJobName = splitJobName[1];
+ }
+
+ String[] splitJobid = splitJobName[0].split(":");
+
+ if (splitJobid.length >= 2) {
+ soughtJobid = splitJobid[1];
+ }
+
+ final String soughtUser = (splitJobid.length >= 1)
+ ? splitJobid[0].toLowerCase()
: "";
- final String jobid = (parts.length >= 2)
- ? parts[1].toLowerCase()
- : "";
- final String rawUser = HtmlQuoting.unquoteHtmlChars(user);
- final String rawJobid = HtmlQuoting.unquoteHtmlChars(jobid);
-
- PathFilter jobLogFileFilter = new PathFilter() {
- private boolean matchUser(String fileName) {
- // return true if
- // - user is not specified
- // - user matches
- return "".equals(rawUser) || rawUser.equals(fileName.split("_")[3]);
- }
- private boolean matchJobId(String fileName) {
- // return true if
- // - jobid is not specified
- // - jobid matches
- String[] jobDetails = fileName.split("_");
- String actualId = jobDetails[0] + "_" +jobDetails[1] + "_" + jobDetails[2] ;
- return "".equals(rawJobid) || jobid.equalsIgnoreCase(actualId);
+ if (soughtDate.length() != 0) {
+ Pattern p = Pattern.compile(DATE_PATTERN);
+ Matcher m = p.matcher(soughtDate);
+ if (!m.matches()) {
+ soughtDate = "";
}
+ }
- public boolean accept(Path path) {
- return (!(path.getName().endsWith(".xml") ||
- path.getName().endsWith(JobHistory.OLD_SUFFIX)) &&
- matchUser(path.getName()) && matchJobId(path.getName()));
- }
- };
+ JobHistory jobHistory = (JobHistory) application.getAttribute("jobHistoryHistory");
+ String soughtDates[] = new String[1];
+ soughtDates[0] = soughtDate;
+
+ JobHistoryRecordRetriever retriever
+ = jobHistory.getMatchingJobs
+ (soughtUser, soughtJobName, soughtDates, soughtJobid);
- FileSystem fs = (FileSystem) application.getAttribute("fileSys");
- String historyLogDir = (String) application.getAttribute("historyLogDir");
- if (fs == null) {
- out.println("Null file system. May be namenode is in safemode!");
- return;
- }
- Path[] jobFiles = FileUtil.stat2Paths(fs.listStatus(new Path(historyLogDir),
- jobLogFileFilter));
- out.println("<!-- user : " + user + ", jobid : " + jobid + "-->");
- if (null == jobFiles || jobFiles.length == 0) {
+ JobHistoryJobRecord[] records
+ = new JobHistoryJobRecord[retriever.numberMatches()];
+
+ int recordsIndex = 0;
+
+ while (retriever.hasNext()) {
+ records[recordsIndex++] = retriever.next();
+ }
+
+ out.println("<!-- user : " + soughtUser + ", jobid : " + soughtJobid + "-->");
+ if (records.length == 0) {
out.println("No files found!");
return ;
}
@@ -132,15 +149,15 @@ window.location.href = url;
int size = 100;
// if show-all is requested or jobfiles < size(100)
- if (pageno == -1 || size > jobFiles.length) {
- size = jobFiles.length;
+ if (pageno == -1 || size > records.length) {
+ size = records.length;
}
if (pageno == -1) { // special case 'show all'
pageno = 1;
}
- int maxPageNo = (int)Math.ceil((float)jobFiles.length / size);
+ int maxPageNo = (records.length + size - 1) / size;
// check and fix pageno
if (pageno < 1 || pageno > maxPageNo) {
@@ -152,15 +169,15 @@ window.location.href = url;
if (pageno == maxPageNo) {
// find the number of files to be shown on the last page
int startOnLast = ((pageno - 1) * size) + 1;
- length = jobFiles.length - startOnLast + 1;
+ length = records.length - startOnLast + 1;
}
// Display the search box
- out.println("<form name=search><b> Filter (username:jobid) </b>"); // heading
+ out.println("<form name=search><b> Filter ([username][:jobid][;jobname-key][!MM/DD/YYYY]) </b>"); // heading
out.println("<input type=text name=search size=\"20\" value=\"" + search + "\">"); // search box
out.println("<input type=submit value=\"Filter!\" onClick=\"showUserHistory(document.getElementById('search').value)\"></form>");
- out.println("<span class=\"small\">Example: 'smith' will display jobs submitted by user 'smith'. </span>");
- out.println("<span class=\"small\">Job Ids need to be prefixed with a colon(:) For example, :job_200908311030_0001 will display the job with that id. </span>"); // example
+ out.println("<span class=\"small\">Example: <b>smith</b> will display jobs submitted by user 'smith'. </span>");
+ out.println("<span class=\"small\">Job Ids need to be prefixed with a colon(:) For example, <b>:job_200908311030_0001</b> will display the job with that id. You may search for parts of job names. Job name search keys need to be prefixed by a semicolon (;). A filter <b>;budget</b> will find jobs named \"budget calculation\" or \"fussbudget job\". You may restrict results to jobs that finished on a specific day. Date criteria are <b>MM/DD/YYYYY</b> and are prefixed with an exclamation point (!). You may specify multiple criteria. We only display jobs that satisfy all criteria.</span>"); // example
out.println("<hr>");
//Show the status
@@ -171,12 +188,15 @@ window.location.href = url;
out.println("<font size=5><b>Available Jobs in History </b></font>");
// display the number of jobs, start index, end index
- out.println("(<i> <span class=\"small\">Displaying <b>" + length + "</b> jobs from <b>" + start + "</b> to <b>" + (start + length - 1) + "</b> out of <b>" + jobFiles.length + "</b> jobs");
- if (!"".equals(user)) {
- out.println(" for user <b>" + HtmlQuoting.quoteHtmlChars(user) + "</b>"); // show the user if present
+ out.println("(<i> <span class=\"small\">Displaying <b>" + length + "</b> jobs from <b>" + start + "</b> to <b>" + (start + length - 1) + "</b> out of <b>" + records.length + "</b> jobs");
+ if (!"".equals(soughtUser)) {
+ out.println(" for user <b>" + soughtUser + "</b>"); // show the user if present
}
- if (!"".equals(jobid)) {
- out.println(" for jobid <b>" + HtmlQuoting.quoteHtmlChars(jobid) + "</b> in it."); // show the jobid keyword if present
+ if (!"".equals(soughtJobid)) {
+ out.println(" for jobid <b>" + soughtJobid + "</b> in it "); // show the jobid keyword if present
+ }
+ if (!"".equals(soughtDate)) {
+ out.println(" for date <b>" + soughtDate + "</b>"); // show the jobid keyword if present
}
out.print("</span></i>)");
@@ -197,28 +217,27 @@ window.location.href = url;
out.println("<span class=\"small\">[last page]</span>");
}
- // sort the files on creation time.
- Arrays.sort(jobFiles, new Comparator<Path>() {
- public int compare(Path p1, Path p2) {
- String dp1 = null;
- String dp2 = null;
-
- dp1 = p1.getName();
- dp2 = p2.getName();
-
- String[] split1 = dp1.split("_");
- String[] split2 = dp2.split("_");
+ // REVERSE sort the files on creation time.
+ Arrays.sort(records, new Comparator<JobHistoryJobRecord>() {
+ public int compare(JobHistoryJobRecord rec1, JobHistoryJobRecord rec2) {
+ String id1 = rec1.getJobIDString(true);
+ String id2 = rec2.getJobIDString(true);
+
+ String[] idsplit1 = id1.split("_");
+ String[] idsplit2 = id2.split("_");
- // compare job tracker start time
- int res = new Date(Long.parseLong(split1[1])).compareTo(
- new Date(Long.parseLong(split2[1])));
- if (res == 0) {
- Long l1 = Long.parseLong(split1[2]);
- res = l1.compareTo(Long.parseLong(split2[2]));
- }
- return res;
- }
- });
+ // compare job tracker start time
+ Long jtTime2 = Long.parseLong(idsplit2[1]);
+ // comparison sense is reversed
+ int res = jtTime2.compareTo(Long.parseLong(idsplit1[1]));
+ if (res == 0) {
+ // comparison sense is reversed
+ Long sn2 = Long.parseLong(idsplit2[2]);
+ res = sn2.compareTo(Long.parseLong(idsplit1[2]));
+ }
+ return res;
+ }
+ });
out.println("<br><br>");
@@ -227,15 +246,21 @@ window.location.href = url;
out.print("<table align=center border=2 cellpadding=\"5\" cellspacing=\"2\">");
out.print("<tr>");
- out.print( "<td>Job Id</td><td>User</td>") ;
+ out.print( "<td>Job submit time</td>");
+ out.print("<td>Job Id</td>");
+ out.print("<td>User</td>") ;
+ out.print("<td>Job Name</td>") ;
out.print("</tr>");
Set<String> displayedJobs = new HashSet<String>();
for (int i = start - 1; i < start + length - 1; ++i) {
- Path jobFile = jobFiles[i];
+ JobHistoryJobRecord record = records[i];
- String jobId = JobHistory.getJobIDFromHistoryFilePath(jobFile).toString();
- String userName = JobHistory.getUserFromHistoryFilePath(jobFile);
+ String jobId = record.getJobIDString();
+ String userName = record.getUserName();
+ long submitTime = record.getSubmitTime();
+ String jobName = record.getJobName();
+ Path logPath = record.getPath();
// Check if the job is already displayed. There can be multiple job
// history files for jobs that have restarted
@@ -248,8 +273,7 @@ window.location.href = url;
%>
<center>
<%
- printJob(jobId, userName, new Path(jobFile.getParent(), jobFile),
- out) ;
+ printJob(submitTime, jobId, userName, logPath, jobName, out) ;
%>
</center>
<%
@@ -260,17 +284,39 @@ window.location.href = url;
printNavigation(pageno, size, maxPageNo, search, out);
%>
<%!
- private void printJob(String jobId,
- String user, Path logFile, JspWriter out)
+ private void printJob(long timestamp, String jobId,
+ String user, Path logFile, String jobName, JspWriter out)
throws IOException {
out.print("<tr>");
+ out.print("<td>" + new Date(timestamp) + "</td>");
out.print("<td>" + "<a href=\"jobdetailshistory.jsp?logFile=" +
URLEncoder.encode(logFile.toString(), "UTF-8") +
- "\">" + HtmlQuoting.quoteHtmlChars(jobId) + "</a></td>");
- out.print("<td>" + HtmlQuoting.quoteHtmlChars(user) + "</td>");
+ "\">" + jobId + "</a></td>");
+ out.print("<td>" + user + "</td>");
+ out.print("<td>" + jobName + "</td>");
out.print("</tr>");
}
+
+ // I tolerate this code because I expect a low number of
+ // occurrences in a relatively short string
+ private static String replaceStringInstances
+ (String replacee, String old, String replacement) {
+ int index = replacee.indexOf(old);
+
+ while (index > 0) {
+ replacee = (replacee.substring(0, index)
+ + replacement
+ + replaceStringInstances
+ (replacee.substring(index + old.length()),
+ old, replacement));
+
+ index = replacee.indexOf(old);
+ }
+
+ return replacee;
+ }
+
private void printNavigation(int pageno, int size, int max, String search,
JspWriter out) throws IOException {
int numIndexToShow = 5; // num indexes to show on either side