You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:53:05 UTC

svn commit: r1079184 - in /hadoop/mapreduce/branches/yahoo-merge/src: java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/ java/org/apache/hadoop/mapreduce/jobhistory/ test/mapred/org/apache/hadoop/mapred/ test/mapred/org/apache/hadoop/tool...

Author: omalley
Date: Tue Mar  8 05:53:05 2011
New Revision: 1079184

URL: http://svn.apache.org/viewvc?rev=1079184&view=rev
Log:
commit ef1a2a3dcc014ffe026c7be3c2e06a44da14de3a
Author: Richard King <dk...@yahoo-inc.com>
Date:   Mon Nov 15 22:43:39 2010 +0000

     increase the flexibility of searching the job history in
      jobhistory.jsp .  Also, stores job history files in multiple
      directories, and establishes a rudimentry database index, to make
      searches more performant.
    
    +++ b/YAHOO-CHANGES.txt
    +   increase the flexibility of searching the job history in
    +  jobhistory.jsp .  From
    +  By dking
    +
    +
    +   stores job history files in multiple directories, and establishes
    +  a rudimentry database index, to make searches more performant.
    +  From
    +  By dking
    +

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
    hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Mar  8 05:53:05 2011
@@ -247,7 +247,7 @@ public class JobTracker implements MRCon
    * Return the JT's job history handle.
    * @return the jobhistory handle
    */
-  JobHistory getJobHistory() { return jobHistory; }
+  public JobHistory getJobHistory() { return jobHistory; }
   /**
    * Start the JobTracker with given configuration.
    * 
@@ -1617,6 +1617,9 @@ public class JobTracker implements MRCon
       }
     });
     infoServer.setAttribute("fileSys", historyFS);
+    infoServer.setAttribute("jobHistoryHistory", jobHistory);
+    infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString());
+    infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString());
 
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
@@ -4622,6 +4625,9 @@ public class JobTracker implements MRCon
     }
 
     infoServer.setAttribute("fileSys", historyFS);
+    infoServer.setAttribute("jobHistoryHistory", jobHistory);
+    infoServer.setAttribute("jobHistoryGlobber", JobHistory.globString());
+    infoServer.setAttribute("jobHistoryLeafGlobber", JobHistory.leafGlobString());
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
     this.infoPort = this.infoServer.getPort();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java Tue Mar  8 05:53:05 2011
@@ -259,8 +259,7 @@ public class Cluster {
     if (jobHistoryDir == null) {
       jobHistoryDir = new Path(client.getJobHistoryDir());
     }
-    return JobHistory.getJobHistoryFile(jobHistoryDir, jobId, 
-        ugi.getShortUserName()).toString();
+    return JobHistory.getJobHistoryFile(jobHistoryDir, jobId).toString();
   }
 
   /**

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Tue Mar  8 05:53:05 2011
@@ -18,20 +18,38 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -43,7 +61,9 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobTracker;
@@ -64,7 +84,7 @@ public class JobHistory {
   final Log LOG = LogFactory.getLog(JobHistory.class);
 
   private long jobHistoryBlockSize;
-  private final Map<JobID, MetaInfo> fileMap =
+  private static final Map<JobID, MetaInfo> fileMap =
     Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
   private ThreadPoolExecutor executor = null;
   static final FsPermission HISTORY_DIR_PERMISSION =
@@ -82,23 +102,69 @@ public class JobHistory {
   private Path logDir = null;
   private Path done = null; // folder for completed jobs
 
+  private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail();
+  private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*";
+
+  static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+
+  // XXXXX debug mode -- set this to false for production
+  private static final boolean DEBUG_MODE = false;
+
+  private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
+  private static final int SERIAL_NUMBER_LOW_DIGITS = DEBUG_MODE ? 1 : 3;
+
+  private static final String SERIAL_NUMBER_FORMAT
+    = ("%0"
+       + (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS)
+       + "d");
+
+  private static final Set<Path> existingDoneSubdirs = new HashSet<Path>();
+
+  private static final SortedMap<Integer, String> idToDateString
+    = new TreeMap<Integer, String>();
+
+  private static Pattern historyCleanerParseDirectory
+    = Pattern.compile(".+/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)/?");
+  //  .+ / YYYY / MM / DD / HH /?
+
+
   public static final String OLD_SUFFIX = ".old";
+  public static final String OLD_FULL_SUFFIX_REGEX_STRING
+    = "(?:\\.[0-9]+" + Pattern.quote(OLD_SUFFIX) + ")";
   
   // Version string that will prefix all History Files
   public static final String HISTORY_VERSION = "1.0";
 
   private HistoryCleaner historyCleanerThread = null;
 
-  private Map<JobID, MovedFileInfo> jobHistoryFileMap = 
+  private static final int version = 3;
+  private static final String LOG_VERSION_STRING = "version-" + version;
+
+  private long jobTrackerStartTime;
+  private String jobTrackerHostName;
+  private String jobTrackerUniqueName;
+
+  private static final Map<JobID, MovedFileInfo> jobHistoryFileMap = 
     Collections.<JobID,MovedFileInfo>synchronizedMap(
         new LinkedHashMap<JobID, MovedFileInfo>());
 
+  // The invariant is that UnindexedElementsState tracks the identity
+  //   of the currently-filling done directory subdirectory, and what
+  //   needs to indexed.
+  // Has to be locked for each file disposition decision
+  private final UnindexedElementsState ueState = new UnindexedElementsState();
+
   // JobHistory filename regex
   public static final Pattern JOBHISTORY_FILENAME_REGEX = 
-    Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+");
+    Pattern.compile("(" + JobID.JOBID_REGEX + ")"
+                    + OLD_FULL_SUFFIX_REGEX_STRING + "?");
   // JobHistory conf-filename regex
   public static final Pattern CONF_FILENAME_REGEX =
-    Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+    Pattern.compile("(" + JobID.JOBID_REGEX + ")"
+                    + CONF_FILE_NAME_SUFFIX
+                    + OLD_FULL_SUFFIX_REGEX_STRING + "?");
+
+  private static final int MAXIMUM_DATESTRING_COUNT = 200000;
   
   private static class MovedFileInfo {
     private final String historyFile;
@@ -119,6 +185,11 @@ public class JobHistory {
   public void init(JobTracker jt, JobConf conf, String hostname,
       long jobTrackerStartTime) throws IOException {
 
+    jobTrackerHostName = hostname;
+    this.jobTrackerStartTime = jobTrackerStartTime;
+
+    this.jobTrackerUniqueName = jobTrackerHostName + "-" + jobTrackerStartTime;
+
     // Get and create the log folder
     final String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION ,
         "file:///" +
@@ -145,6 +216,20 @@ public class JobHistory {
     
     jobTracker = jt;
   }
+
+  public static String leafGlobString() {
+    return "/" + LOG_VERSION_STRING
+      + "/*"                    // job tracker ID
+      + "/YYYY/MM/DD/HH"        // time segment
+      ;
+  }
+
+  public static String globString() {
+    return "/" + LOG_VERSION_STRING
+      + "/*"                    // job tracker ID
+      + "/YYYY/MM/DD"        // time segment
+      ;
+  }
   
   /** Initialize the done directory and start the history cleaner thread */
   public void initDone(JobConf conf, FileSystem fs) throws IOException {
@@ -163,8 +248,8 @@ public class JobHistory {
     //permission
     if (!doneDirFs.exists(done)) {
       LOG.info("Creating DONE folder at "+ done);
-      if (! doneDirFs.mkdirs(done, 
-          new FsPermission(HISTORY_DIR_PERMISSION))) {
+      if (!doneDirFs.mkdirs(done, 
+                            new FsPermission(HISTORY_DIR_PERMISSION))) {
         throw new IOException("Mkdirs failed to create " + done.toString());
       }
     }
@@ -218,11 +303,20 @@ public class JobHistory {
   /**
    * Get the job history file path
    */
-  public static Path getJobHistoryFile(Path dir, JobID jobId, 
-      String user) {
-    return new Path(dir, jobId.toString() + "_" + user);
+  public static Path getJobHistoryFile
+        (Path dir, JobID jobId) {
+    MetaInfo info = fileMap.get(jobId);
+
+    if (info == null) {
+      fileMap.put(jobId, new MetaInfo(null, null, null, System.currentTimeMillis(), null, null));
+      return getJobHistoryFile(dir, jobId);
+    }            
+
+    return new Path(dir, jobId.toString());
   }
 
+  
+
   /**
    * Get the JobID from the history file's name. See it's companion method
    * {@link #getJobHistoryFile(Path, JobID, String)} for how history file's name
@@ -238,19 +332,37 @@ public class JobHistory {
     return JobID.forName(jobId);
   }
 
-  /**
-   * Get the user name of the job-submitter from the history file's name. See
-   * it's companion method {@link #getJobHistoryFile(Path, JobID, String)} for
-   * how history file's name is constructed from a given JobID and username.
-   * 
-   * @param jobHistoryFilePath
-   * @return the user-name
-   */
-  public static String getUserFromHistoryFilePath(Path jobHistoryFilePath) {
-    String[] jobDetails = jobHistoryFilePath.getName().split("_");
-    return jobDetails[3];
+  static String nonOccursString(String logFileName) {
+    int adHocIndex = 0;
+
+    String unfoundString = "q" + adHocIndex;
+
+    while (logFileName.contains(unfoundString)) {
+      unfoundString = "q" + ++adHocIndex;
+    }
+
+    return unfoundString + "q";
   }
 
+  // I tolerate this code because I expect a low number of
+  // occurrences in a relatively short string
+  static String replaceStringInstances
+      (String logFileName, String old, String replacement) {
+    int index = logFileName.indexOf(old);
+
+    while (index > 0) {
+      logFileName = (logFileName.substring(0, index)
+                     + replacement
+                     + replaceStringInstances
+                         (logFileName.substring(index + old.length()),
+                          old, replacement));
+
+      index = logFileName.indexOf(old);
+    }
+
+    return logFileName;
+  }      
+
   /**
    * Given the job id, return the history file path from the cache
    */
@@ -261,6 +373,31 @@ public class JobHistory {
     }
     return info.historyFile;
   }
+
+  /**
+   * Given the job id, return the conf.xml file path from the cache
+   */
+  public String getConfFilePath(JobID jobId) {
+    MovedFileInfo info = jobHistoryFileMap.get(jobId);
+    if (info == null) {
+      return null;
+    }
+    final Path historyFileDir
+      = (new Path(getHistoryFilePath(jobId))).getParent();
+    return getConfFile(historyFileDir, jobId).toString();
+  }
+    
+  /**
+   * Get the job name from the job conf
+   */
+  static String getJobName(JobConf jobConf) {
+    String jobName = jobConf.getJobName();
+    if (jobName == null || jobName.length() == 0) {
+      jobName = "NA";
+    }
+    return jobName;
+  }
+
   /**
    * Create an event writer for the Job represented by the jobID.
    * This should be the first call to history for a job
@@ -270,12 +407,19 @@ public class JobHistory {
    */
   public void setupEventWriter(JobID jobId, JobConf jobConf)
   throws IOException {
-    Path logFile = getJobHistoryFile(logDir, jobId, getUserName(jobConf));
-  
     if (logDir == null) {
       LOG.info("Log Directory is null, returning");
       throw new IOException("Missing Log Directory for History");
     }
+
+    MetaInfo oldFi = fileMap.get(jobId);
+
+    long submitTime = (oldFi == null ? System.currentTimeMillis() : oldFi.submitTime);
+
+    String user = getUserName(jobConf);
+    String jobName = getJobName(jobConf);
+    
+    Path logFile = getJobHistoryFile(logDir, jobId);
   
     int defaultBufferSize = 
       logDirFs.getConf().getInt("io.file.buffer.size", 4096);
@@ -315,7 +459,7 @@ public class JobHistory {
           + StringUtils.stringifyException(e));
     }
   
-    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer);
+    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName);
     fileMap.put(jobId, fi);
   }
 
@@ -360,7 +504,7 @@ public class JobHistory {
   }
   
   private void startFileMoverThreads() {
-    executor = new ThreadPoolExecutor(1, 3, 1, 
+    executor = new ThreadPoolExecutor(3, 5, 1, 
         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
   }
 
@@ -375,14 +519,14 @@ public class JobHistory {
     Path jobFilePath = null;
     if (logDir != null) {
       jobFilePath = new Path(logDir + File.separator +
-          jobId.toString() + "_conf.xml");
+          jobId.toString() + CONF_FILE_NAME_SUFFIX);
     }
     return jobFilePath;
   }
 
   /**
    * Generates a suffix for old/stale jobhistory files
-   * Pattern : . + identifier + .old
+   * Pattern : . + identifier + JobHistory.OLD_SUFFIX
    */
   public static String getOldFileSuffix(String identifier) {
     return "." + identifier + JobHistory.OLD_SUFFIX;
@@ -394,13 +538,18 @@ public class JobHistory {
     //files with same job id don't get over written in case of recovery.
     FileStatus[] files = logDirFs.listStatus(logDir);
     String fileSuffix = getOldFileSuffix(jobTracker.getTrackerIdentifier());
+    // We use the same millisecond time for all files so the config file
+    //  and job history file flow to the same subdirectory
+    long millisecondTime = ueState.monotonicTime();
     for (FileStatus fileStatus : files) {
       Path fromPath = fileStatus.getPath();
       if (fromPath.equals(done)) { //DONE can be a subfolder of log dir
         continue;
       }
       LOG.info("Moving log file from last run: " + fromPath);
-      Path toPath = new Path(done, fromPath.getName() + fileSuffix);
+      Path resultDir
+        = canonicalHistoryLogDir(null, millisecondTime);
+      Path toPath = new Path(resultDir, fromPath.getName() + fileSuffix);
       try {
         moveToDoneNow(fromPath, toPath);
       } catch (ChecksumException e) {
@@ -424,15 +573,16 @@ public class JobHistory {
       }
     }
   }
+    
   
   private void moveToDone(final JobID id) {
     final List<Path> paths = new ArrayList<Path>();
     final MetaInfo metaInfo = fileMap.get(id);
-    if (metaInfo == null) {
+    if (metaInfo == null || metaInfo.getHistoryFile() == null) {
       LOG.info("No file for job-history with " + id + " found in cache!");
       return;
     }
-    
+
     final Path historyFile = metaInfo.getHistoryFile();
     if (historyFile == null) {
       LOG.info("No file for job-history with " + id + " found in cache!");
@@ -448,32 +598,442 @@ public class JobHistory {
     }
 
     executor.execute(new Runnable() {
+      static final int SPONTANEOUSLY_CLOSE_INDEX_INTERVAL = 30 * 1000;
+
+      static final int SPONTANEOUS_INTERIM_INDEX_INTERVAL = 300 * 1000;
 
       public void run() {
-        //move the files to DONE folder
-        try {
-          for (Path path : paths) { 
-            moveToDoneNow(path, new Path(done, path.getName()));
+        boolean iShouldMonitor = false;
+
+        Path resultDir = null;
+
+        String historyFileDonePath = null;
+
+        Path failedJobHistoryIndexBuildPath = null;
+        Throwable failedHistoryMoveException = null;
+
+        synchronized (ueState) {
+          // needed because it's possible for system time to go backward
+          long millisecondTime = ueState.monotonicTime();
+
+          resultDir = canonicalHistoryLogDir(id, millisecondTime);
+
+          if (!resultDir.equals(ueState.currentDoneSubdirectory)) {
+            if (ueState.currentDoneSubdirectory != null) {
+              try {
+                ueState.closeCurrentDirectory();
+              } catch (IOException e) {
+                failedJobHistoryIndexBuildPath = ueState.currentDoneSubdirectory;
+              }
+            }
+
+            iShouldMonitor = true;
+
+            ueState.indexableElements = new LinkedList<JobHistoryIndexElement>();
+            ueState.currentDoneSubdirectory = resultDir;
+
+            ueState.monitoredDirectory = resultDir;
           }
-        } catch (Throwable e) {
-          LOG.error("Unable to move history file to DONE folder.", e);
+
+          // We need to make the JobHistoryIndexElement here, because after
+          //  we've copied the file the info might disappear, but before we've
+          //  closed the previous subdirectory [if we do that] it would go into
+          //  the wrong subdirectory index.
+          ueState.indexableElements.
+            add(new JobHistoryIndexElement(millisecondTime, id, metaInfo));
+
+          //move the files to a DONE canonical subfolder
+          try {
+            for (Path path : paths) { 
+              moveToDoneNow(path, new Path(resultDir, path.getName()));
+            }
+          } catch (Throwable e) {
+            failedHistoryMoveException = e;
+          }
+          if (historyFile != null) {
+            historyFileDonePath = new Path(resultDir, 
+                                           historyFile.getName()).toString();
+          }
+          jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath, 
+                                                      millisecondTime));
         }
-        String historyFileDonePath = null;
-        if (historyFile != null) {
-          historyFileDonePath = new Path(done, 
-              historyFile.getName()).toString();
+
+        if (failedJobHistoryIndexBuildPath != null) {
+          LOG.warn("Couldn't build a Job History index for "
+                   + failedJobHistoryIndexBuildPath);
+        }
+        if (failedHistoryMoveException != null) {
+          LOG.error("Can't move history file to DONE canonical subfolder.",
+                    failedHistoryMoveException);
         }
-        jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath, 
-            System.currentTimeMillis()));
+          
+
         jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id),
-            historyFileDonePath);
+                             historyFileDonePath);
 
         //purge the job from the cache
         fileMap.remove(id);
-      }
 
+        // Except ephemerally, only one task will be in this code at a
+        // time, because iShouldMonitor is only set true when
+        // ueState.monitoredDirectory changes, which will force the
+        // current incumbent to abend at the earliest opportunity.
+        while (iShouldMonitor) {
+          int roundCounter = 0;
+
+          int interruptionsToAbort = 2;
+
+          try {
+            Thread.sleep(SPONTANEOUSLY_CLOSE_INDEX_INTERVAL);
+          } catch (InterruptedException e) {
+            if (--interruptionsToAbort == 0) {
+              return;
+            }
+          }
+
+          Path unbuildableJobHistoryIndex = null;
+          Path unbuildableInterimJobHistoryIndex = null;
+
+          synchronized (ueState) {
+            if (ueState.monitoredDirectory != resultDir) {
+              // someone else closed out the directory I was monitoring
+              iShouldMonitor = false;
+            } else {
+              interruptionsToAbort = 2;
+
+              long millisecondTime = ueState.monotonicTime();
+
+              Path newResultDir = canonicalHistoryLogDir(id, millisecondTime);
+
+              if (!newResultDir.equals(resultDir)) {
+                try {
+                  ueState.closeCurrentDirectory();
+                } catch (IOException e) {
+                  unbuildableJobHistoryIndex = ueState.currentDoneSubdirectory;
+                }
+                iShouldMonitor = false;
+              }
+            }
+
+            if (iShouldMonitor
+                && (++roundCounter
+                    % (SPONTANEOUS_INTERIM_INDEX_INTERVAL
+                       / SPONTANEOUSLY_CLOSE_INDEX_INTERVAL)
+                    == 0)) {
+              // called for side effect -- a 5 minute checkpoint to
+              // reduce possible unindexed jobs on a JT crash
+              try {
+                ueState.getACurrentIndex(ueState.currentDoneSubdirectory);
+              } catch (IOException e) {
+                unbuildableInterimJobHistoryIndex
+                  = ueState.currentDoneSubdirectory;
+              }
+            }
+          }
+
+          if (unbuildableJobHistoryIndex != null) {
+            LOG.warn("Couldn't build a Job History index for "
+                     + unbuildableJobHistoryIndex);
+          }
+
+          if (unbuildableInterimJobHistoryIndex != null) {
+            LOG.warn("Couldn't build an interim Job History index for "
+                     + unbuildableInterimJobHistoryIndex);
+          }
+        }
+      }
     });
   }
+
+  public String[] currentIndex(Path theDoneSubdirectory)
+        throws IOException {
+    return ueState.currentIndex(theDoneSubdirectory);
+  }
+
+  // we only create one instance per JobHistory
+  class UnindexedElementsState {
+    long monotonicTime = Long.MIN_VALUE;
+    Path currentDoneSubdirectory = null;
+    private List<JobHistoryIndexElement> indexableElements = null;
+    Path monitoredDirectory = null;
+    int indexIndex = 0;
+    int indexedElementCount = 0;
+
+    private void buildIndex(String indexName) throws IOException {
+      Path tempPath = new Path(currentDoneSubdirectory, "nascent-index");
+      Path indexPath = new Path(currentDoneSubdirectory, indexName);
+
+      OutputStream newIndexOStream = null;
+      PrintStream newIndexPStream = null;
+
+      indexedElementCount = indexableElements.size();
+ 
+      try {
+        newIndexOStream
+          = FileSystem.create(doneDirFs, tempPath, HISTORY_FILE_PERMISSION);
+
+        newIndexPStream = new PrintStream(newIndexOStream);
+
+        for (JobHistoryIndexElement elt : indexableElements) {
+          newIndexPStream.println(elt.toString());
+        }
+      } finally {
+        if (newIndexPStream != null) {
+          newIndexPStream.close();
+
+          if (doneDirFs.exists(tempPath)) {
+            doneDirFs.rename(tempPath, indexPath);
+          } 
+        } else if (newIndexOStream != null) {
+          newIndexOStream.close();
+          doneDirFs.delete(tempPath, false);
+        }
+      }
+    }
+
+    synchronized String[] currentIndex(Path theDoneSubdirectory)
+        throws IOException {
+      Path subdirIndex = getACurrentIndex(theDoneSubdirectory);
+
+      List<String> indexAsRead = new ArrayList<String>();
+
+      InputStream iStream = null;
+      InputStreamReader isReader = null;
+      BufferedReader breader = null;
+
+      try {
+        iStream = doneDirFs.open(subdirIndex);
+        isReader = new InputStreamReader(iStream);
+        breader = new BufferedReader(isReader);
+
+        String thisRecord = breader.readLine();
+
+        while (thisRecord != null) {
+          indexAsRead.add(thisRecord);
+          thisRecord = breader.readLine();
+        }
+
+        String[] result = indexAsRead.toArray(new String[0]);
+
+        Arrays.sort(result);
+
+        return result;
+      } finally {
+        if (breader != null) {
+          breader.close();
+        } else if (isReader != null) {
+          isReader.close();
+        } else if (iStream != null) {
+          iStream.close();
+        }
+      }
+    }
+      
+    // If this is the block that's now being built, we build a new
+    // index and return that.  This shouldn't be called on an empty
+    // subdirectory.
+    // 
+    // getACurrentIndex must be called within a synchronized(this) block.
+    // Currently there are two calls, both of which qualify.
+    Path getACurrentIndex(Path theDoneSubdirectory) throws IOException {
+      if (!theDoneSubdirectory.equals(currentDoneSubdirectory)) {
+        return new Path(theDoneSubdirectory, "index");
+      }
+
+      if (indexedElementCount == indexableElements.size()) {
+        return new Path(theDoneSubdirectory, "index-" + indexIndex);
+      }
+
+      String indexName = "index-" + ++indexIndex;
+
+      buildIndex(indexName);
+
+      return new Path(theDoneSubdirectory, indexName);
+    }
+
+    // not synchronized, because calls must be in a larger synchronized context
+    private void closeCurrentDirectory() throws IOException {
+      if (currentDoneSubdirectory == null) {
+        return;
+      }
+
+      buildIndex("index");
+    }
+
+    synchronized long monotonicTime() {
+      monotonicTime = Math.max(monotonicTime, System.currentTimeMillis());
+      return monotonicTime;
+    }
+  }
+
+  static class JobHistoryIndexElement {
+    // id and millisecondTime are currently unused.
+    final JobID id;
+    final long millisecondTime;
+    final MetaInfo metaInfo;
+
+    JobHistoryIndexElement(long millisecondTime, JobID id, MetaInfo metaInfo) {
+      this.id = id;
+      this.millisecondTime = millisecondTime;
+      this.metaInfo = metaInfo;
+    }
+
+    public String toString() {
+      String user = metaInfo.user;
+      String jobName = metaInfo.jobName;
+
+      if (jobName.length() > 50) {
+        jobName = jobName.substring(0, 50);
+      }
+
+      String adHocBarEscape = "";
+
+      if (user.indexOf('|') >= 0 || jobName.indexOf('|') >= 0) {
+        adHocBarEscape = nonOccursString(user + jobName);
+
+        user = replaceStringInstances(user, "|", adHocBarEscape);
+        jobName = replaceStringInstances(jobName, "|", adHocBarEscape);
+      }
+
+      return (metaInfo.getHistoryFile().getName()
+              + "|" + millisecondTime
+              + "|" + adHocBarEscape
+              + "|" + user
+              + "|" + jobName); 
+    }
+  }
+
+  // several methods for manipulating the subdirectories of the DONE
+  // directory 
+
+  // directory components may contain internal slashes, but do NOT
+  // contain slashes at either end.
+
+  // In this nest of code, id can be null.  In that case it is an error to call
+  //  more than once to get a single filename.  This can happen when we're moving
+  //  files from an old run into the new context.  See moveOldFiles() .
+  private static String timestampDirectoryComponent(JobID id, long millisecondTime) {
+    Integer boxedSerialNumber = null;
+
+    if (id != null) {
+      boxedSerialNumber = id.getId();
+    }
+
+    // don't want to do this inside the lock
+    Calendar timestamp = Calendar.getInstance();
+    timestamp.setTimeInMillis(millisecondTime);
+
+    synchronized (idToDateString) {
+      String dateString
+        = (boxedSerialNumber == null ? null : idToDateString.get(boxedSerialNumber));
+
+      if (dateString == null) {
+
+        dateString = String.format
+          ("%04d/%02d/%02d/%02d",
+           timestamp.get(Calendar.YEAR),
+           timestamp.get(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH) + 1,
+           timestamp.get(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH),
+           timestamp.get(DEBUG_MODE ? Calendar.SECOND : Calendar.HOUR));
+
+        dateString = dateString.intern();
+
+        if (boxedSerialNumber != null) {
+          idToDateString.put(boxedSerialNumber, dateString);
+
+          if (idToDateString.size() > MAXIMUM_DATESTRING_COUNT) {
+            idToDateString.remove(idToDateString.firstKey());
+          }
+        }
+      }
+
+      return dateString;
+    }
+  }
+
+  // returns false iff the directory already existed
+  private boolean maybeMakeSubdirectory(JobID id, long millisecondTime)
+          throws IOException {
+    Path dir = canonicalHistoryLogDir(id, millisecondTime);
+
+    String deferredErrorPrintout = null;
+    String deferredInfoLogging = null;
+
+    try {
+      synchronized (existingDoneSubdirs) {
+        if (existingDoneSubdirs.contains(dir)) {
+          if (DEBUG_MODE && !doneDirFs.exists(dir)) {
+            deferredErrorPrintout
+              = ("JobHistory.maybeMakeSubdirectory -- We believed "
+                 + dir + " already existed, but it didn't.");
+          }
+          
+          return true;
+        }
+
+        if (!doneDirFs.exists(dir)) {
+          deferredInfoLogging = "Creating DONE subfolder at " + dir;
+
+          if (!doneDirFs.mkdirs(dir,
+                                new FsPermission(HISTORY_DIR_PERMISSION))) {
+            throw new IOException("Mkdirs failed to create " + dir.toString());
+          }
+
+          existingDoneSubdirs.add(dir);
+
+          return false;
+        } else {
+          if (DEBUG_MODE) {
+            deferredErrorPrintout
+              = ("JobHistory.maybeMakeSubdirectory -- We believed "
+                 + dir + " didn't already exist, but it did.");
+          }
+
+          return false;
+        }
+      }
+    } finally {
+      if (deferredErrorPrintout != null) {
+        System.err.println(deferredErrorPrintout);
+      }
+
+      if (deferredInfoLogging != null) {
+        LOG.info(deferredInfoLogging);
+      }
+    }
+  }
+
+  // Previous versions of this code used the id argument, when the
+  //  directory structure was a bit different.
+  // I'm leaving the currently unused id argument in place, in case we
+  //  decide to start using it again in the future.
+  private Path canonicalHistoryLogDir(JobID id, long millisecondTime) {
+    return new Path(done, historyLogSubdirectory(id, millisecondTime));
+  }
+
+  private String historyLogSubdirectory(JobID id, long millisecondTime) {
+    String result = LOG_VERSION_STRING
+      + "/" + jobtrackerDirectoryComponent(id);
+
+    result = (result
+              + "/" + timestampDirectoryComponent(id, millisecondTime)
+              + "/")
+      .intern();
+
+    return result;
+  }
+
+  private String jobtrackerDirectoryComponent(JobID id) {
+    return jobTrackerUniqueName;
+  }
+
+  private static String doneSubdirsBeforeSerialTail() {
+    // job tracker ID + date
+    String result = "/*/*/*/*/*";   // job tracker instance ID/YYYY/MM/DD/HH
+
+    return result;
+  }
+
   
   private String getUserName(JobConf jobConf) {
     String user = jobConf.getUser();
@@ -483,15 +1043,141 @@ public class JobHistory {
     return user;
   }
   
+
+  // hasMismatches is just used to return a second value if you want
+  // one.  I would have used MutableBoxedBoolean if such had been provided.
+  static Path[] filteredStat2Paths
+          (FileStatus[] stats, boolean dirs, AtomicBoolean hasMismatches) {
+    int resultCount = 0;
+
+    if (hasMismatches == null) {
+      hasMismatches = new AtomicBoolean(false);
+    }
+
+    for (int i = 0; i < stats.length; ++i) {
+      if (stats[i].isDir() == dirs) {
+        stats[resultCount++] = stats[i];
+      } else {
+        hasMismatches.set(true);
+      }
+    }
+
+    Path[] paddedResult = FileUtil.stat2Paths(stats);
+
+    Path[] result = new Path[resultCount];
+
+    System.arraycopy(paddedResult, 0, result, 0, resultCount);
+
+    return result;
+  }
+
+  public FileStatus[] getAllHistoryConfFiles() throws IOException {
+    return localGlobber
+      (doneDirFs, done, "/" + LOG_VERSION_STRING + "/*/*/*/*/*");
+  }
+
+  public static FileStatus[] localGlobber
+        (FileSystem fs, Path root, String tail) 
+      throws IOException {
+    return localGlobber(fs, root, tail, null);
+  }
+
+  public static FileStatus[] localGlobber
+        (FileSystem fs, Path root, String tail, PathFilter filter) 
+      throws IOException {
+    return localGlobber(fs, root, tail, filter, null);
+  }
+  
+  private static FileStatus[] nullToEmpty(FileStatus[] result) {
+    return result == null ? new FileStatus[0] : result;
+  }
+      
+  private static FileStatus[] listFilteredStatus
+        (FileSystem fs, Path root, PathFilter filter)
+     throws IOException {
+    return filter == null ? fs.listStatus(root) : fs.listStatus(root, filter);
+  }
+
+  // hasMismatches is just used to return a second value if you want
+  // one.  I would have used MutableBoxedBoolean if such had been provided.
+  static FileStatus[] localGlobber
+    (FileSystem fs, Path root, String tail, PathFilter filter,
+     AtomicBoolean hasFlatFiles)
+      throws IOException {
+    if (tail.equals("")) {
+      return nullToEmpty(listFilteredStatus(fs, root, filter));
+    }
+
+    if (tail.startsWith("/*")) {
+      Path[] subdirs = filteredStat2Paths(nullToEmpty(fs.listStatus(root)),
+                                          true, hasFlatFiles);
+
+      FileStatus[][] subsubdirs = new FileStatus[subdirs.length][];
+
+      int subsubdirCount = 0;
+
+      if (subsubdirs.length == 0) {
+        return new FileStatus[0];
+      }
+
+      String newTail = tail.substring(2);
+
+      for (int i = 0; i < subdirs.length; ++i) {
+        subsubdirs[i] = localGlobber(fs, subdirs[i], newTail, filter, null);
+        subsubdirCount += subsubdirs[i].length;
+      }
+
+      FileStatus[] result = new FileStatus[subsubdirCount];
+
+      int segmentStart = 0;
+
+      for (int i = 0; i < subsubdirs.length; ++i) {
+        System.arraycopy(subsubdirs[i], 0, result, segmentStart, subsubdirs[i].length);
+        segmentStart += subsubdirs[i].length;
+      }
+
+      return result;
+    }
+
+    if (tail.startsWith("/")) {
+      int split = tail.indexOf('/', 1);
+
+      try {
+        if (split < 0) {
+          return nullToEmpty
+            (listFilteredStatus(fs, new Path(root, tail.substring(1)), filter));
+        } else {
+          String thisSegment = tail.substring(1, split);
+          String newTail = tail.substring(split);
+          return localGlobber
+            (fs, new Path(root, thisSegment), newTail, filter, hasFlatFiles);
+        }
+      } catch (FileNotFoundException ignored) {
+        return new FileStatus[0];
+      }
+    }
+
+    IOException e = new IOException("localGlobber: bad tail");
+
+    throw e;
+  }
+
   private static class MetaInfo {
     private Path historyFile;
     private Path confFile;
     private EventWriter writer;
+    long submitTime;
+    String user;
+    String jobName;
 
-    MetaInfo(Path historyFile, Path conf, EventWriter writer) {
+    MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
+             String user, String jobName) {
       this.historyFile = historyFile;
       this.confFile = conf;
       this.writer = writer;
+      this.submitTime = submitTime;
+      this.user = user;
+      this.jobName = jobName;
     }
 
     Path getHistoryFile() { return historyFile; }
@@ -512,18 +1198,315 @@ public class JobHistory {
   }
 
   /**
+   * Returns a job history log Path generator
+   * We return all Path's that exist in the history at time of call, subject to
+   *   four conjunctive criteria, one for each of parameter.  Each parameter is 
+   *   null or the empty string if caller doesn't want that filtering.
+   * It's perfectly alright to call this with no restrictions.
+   *
+   * @param user the desired username 
+   *           [or null or the empty string, if no specific user]
+   * @param jobnameSubstring a substring of the job names
+   * @param dateStrings an array of date strings, format MM/DD/YYYY .  This 
+   *           criterion accepts logs with ANY of the dates
+   * @param soughtJobid the String naming the jobID we want, if any.  Note
+   *           that this criterion names a unique job; you may not want
+   *           to specify any other criteria if you specify this one.
+   * @throws IOException
+   */
+  public JobHistoryRecordRetriever getMatchingJobs
+          (String user, String jobnameSubstring,
+           String[] dateStrings, String soughtJobid)
+        throws IOException {
+    return new JobHistoryRecordRetriever
+      (user, jobnameSubstring, dateStrings, soughtJobid);
+  }
+    
+  public static class JobHistoryJobRecord {
+    private Path basePath;
+    private String recordText;
+    private String[] recordSplits = null;
+
+    JobHistoryJobRecord(Path basePath, String recordText) {
+      this.basePath = basePath;
+      this.recordText = recordText;
+    }
+
+    private String[] getSplits(boolean noCache) {
+      if (recordSplits != null) {
+        return recordSplits;
+      }
+
+      String[] result = recordText.split("\\|");
+
+      if (!noCache) {
+        recordSplits = result;
+      }
+
+      return result;
+    }
+
+    public Path getPath() {
+      return getPath(false);
+    }
+
+    public Path getPath(boolean noCache) {
+      return new Path(basePath, getSplits(noCache)[0]);
+    }
+
+    public String getJobIDString() {
+      return getJobIDString(false);
+    }
+
+    public String getJobIDString(boolean noCache) {
+      return getSplits(noCache)[0];
+    }    
+
+    public long getSubmitTime() {
+      return getSubmitTime(false);
+    }
+
+    public long getSubmitTime(boolean noCache) {
+      return Long.parseLong(getSplits(noCache)[1]);
+    }
+
+    public String getUserName() {
+      return getUserName(false);
+    }
+
+    public String getUserName(boolean noCache) {
+      String[] splits = getSplits(noCache);
+
+      String result = splits[3];
+
+      if (splits[2].length() != 0) {
+        result = replaceStringInstances(result, splits[2], "|");
+      }
+
+      return result;
+    }
+
+    public String getJobName() {
+      return getJobName(false);
+    }
+
+    public String getJobName(boolean noCache) {
+      String[] splits = getSplits(noCache);
+
+      String result = splits[4];
+
+      if (splits[2].length() != 0) {
+        result = replaceStringInstances(result, splits[2], "|");
+      }
+
+      return result;
+    }
+  }
+
+  public class JobHistoryRecordRetriever implements Iterator<JobHistoryJobRecord> {
+    private final Pattern DATE_PATTERN
+      = Pattern.compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])");
+    
+
+    private class BaseElements {
+      Path basePath;
+      List<String> records = new LinkedList<String>();
+    }
+
+    // Internal contract -- elts contains no empty BaseElements's
+    private List<BaseElements> elts = new LinkedList<BaseElements>();
+
+    private Iterator<BaseElements> eltsCursor;
+    private Iterator<String> currentEltCursor;
+
+    private BaseElements currentBE = null;
+
+    private int numberMatches;
+
+    @Override
+    public boolean hasNext() {
+      return currentEltCursor.hasNext() || eltsCursor.hasNext();
+    }
+
+    @Override
+    public JobHistoryJobRecord next() throws NoSuchElementException {
+      if (currentEltCursor.hasNext()) {
+        return new JobHistoryJobRecord(currentBE.basePath, currentEltCursor.next());
+      }
+
+      currentBE = eltsCursor.next();
+      currentEltCursor = currentBE.records.iterator();
+      return next();
+    }
+
+    @Override
+    public void remove() throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("no remove() operation");
+    }
+
+    public int numberMatches() {
+      return numberMatches;
+    }
+
+    public JobHistoryRecordRetriever
+          (String soughtUser, String soughtJobName, String[] dateStrings, String soughtJobid)
+        throws IOException {
+      numberMatches = 0;
+
+      soughtUser = soughtUser == null ? "" : soughtUser;
+      soughtJobName = soughtJobName == null ? "" : soughtJobName;
+      soughtJobid = soughtJobid == null ? "" : soughtJobid;
+
+      if (dateStrings == null || dateStrings.length == 0) {
+        dateStrings = new String[1];
+        dateStrings[0] = "";
+      }
+
+      for (int i = 0; i < dateStrings.length; ++i) {
+        String soughtDate = dateStrings[i];
+        String globString = globString();
+        
+
+        String yyyyGlobPart = "*";
+        String mmGlobPart = "*";
+        String ddGlobPart = "*";
+        String hhGlobPart = "*";
+
+        if (soughtDate.length() != 0) {
+          Matcher m = DATE_PATTERN.matcher(soughtDate);
+          if (m.matches()) {
+            yyyyGlobPart = m.group(3);
+            mmGlobPart = m.group(1);
+            ddGlobPart = m.group(2);
+
+            if (yyyyGlobPart.length() == 2) {
+              yyyyGlobPart = "20" + yyyyGlobPart;
+            }
+
+            if (mmGlobPart.length() == 1) {
+              mmGlobPart = "0" + mmGlobPart;
+            }
+
+            if (ddGlobPart.length() == 1) {
+              ddGlobPart = "0" + ddGlobPart;
+            }
+          }
+        }
+
+        globString = globString.replace("YYYY", yyyyGlobPart);
+        globString = globString.replace("MM", mmGlobPart);
+        globString = globString.replace("DD", ddGlobPart);
+        globString = globString.replace("HH", hhGlobPart);
+
+        if (doneDirFs == null) {
+          if (DEBUG_MODE) {
+            System.out.println("Null file system. May be namenode is in safemode!");
+          }
+          return;
+        }
+
+        Path[] jobDirectories
+          = FileUtil.stat2Paths
+              (JobHistory.localGlobber
+               (doneDirFs, done, globString));
+
+        for (int jd = 0; jd < jobDirectories.length; jd++) {
+          Path doneSubdirectory = jobDirectories[jd];
+
+          String[] subdirectoryIndex = new String[0];
+
+          BaseElements be = new BaseElements();
+
+          be.basePath = doneSubdirectory;
+
+          try {
+            subdirectoryIndex = currentIndex(doneSubdirectory);
+          } catch (FileNotFoundException e) {
+            // no code -- should we log here?
+          }
+
+          for (int j = 0; j < subdirectoryIndex.length; ++j) {
+            String[] segments = subdirectoryIndex[j].split("\\|");
+
+            // segments are [0] jobid [also file name]
+            //              [1] submit time [milliseconds]
+            //              [2] ad hoc '|' substitution
+            //              [3] user name
+            //              [4] trimmed job jame
+            //
+
+            String user = segments[3];
+            String jobName = segments[4];
+
+            if (segments[2].length() > 0) {
+              user = replaceStringInstances(user, segments[2], "|");
+              jobName = replaceStringInstances(jobName, segments[2], "|");
+            }
+
+            if ((soughtJobid.equals("") || segments[0].equalsIgnoreCase(soughtJobid))
+                && (soughtUser.equals("") || user.equalsIgnoreCase(soughtUser))
+                && (soughtJobName.equals("") || jobName.contains(soughtJobName))) {
+              be.records.add(subdirectoryIndex[j]);
+            }
+          }
+
+          if (be.records.size() != 0) {
+            elts.add(be);
+
+            numberMatches += be.records.size();
+          }
+        }
+      }
+
+      eltsCursor = elts.iterator();
+
+      currentEltCursor = new LinkedList<String>().iterator();
+    }
+  }
+
+  static long directoryTime(String year, String seg2, String seg3, String seg4) {
+    // set to current time.  In debug mode, this is where the month
+    // and day get set.
+    Calendar result = Calendar.getInstance();
+    // canonicalize by filling in unset fields
+    result.setTimeInMillis(System.currentTimeMillis());
+
+    result.set(Calendar.YEAR, Integer.parseInt(year));
+
+    int seg2int = Integer.parseInt(seg2);
+    if (!DEBUG_MODE) {
+      --seg2int;
+    }
+
+    result.set(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH,
+               seg2int);
+    result.set(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH,
+               Integer.parseInt(seg3));
+    result.set(DEBUG_MODE ? Calendar.SECOND : Calendar.HOUR,
+               Integer.parseInt(seg4));
+
+    return result.getTimeInMillis();
+  }
+
+  /**
    * Delete history files older than a specified time duration.
    */
   class HistoryCleaner extends Thread {
     static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
+
+    static final long DIRECTORY_LIFE_IN_MS
+      = DEBUG_MODE ? 20 * 60 * 1000L : 30 * ONE_DAY_IN_MS;
+    static final long RUN_INTERVAL
+      = DEBUG_MODE ? 10L * 60L * 1000L : ONE_DAY_IN_MS;
+
     private long cleanupFrequency;
     private long maxAgeOfHistoryFiles;
-  
+
     public HistoryCleaner(long maxAge) {
       setName("Thread for cleaning up History files");
       setDaemon(true);
       this.maxAgeOfHistoryFiles = maxAge;
-      cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles);
+      cleanupFrequency = Math.min(RUN_INTERVAL, maxAgeOfHistoryFiles);
       LOG.info("Job History Cleaner Thread started." +
           " MaxAge is " + 
           maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," +
@@ -551,29 +1534,93 @@ public class JobHistory {
     }
   
     private void doCleanup() {
-      long now = System.currentTimeMillis();
+      long now = ueState.monotonicTime();
+
+      boolean printedOneDeletee = false;
+
+      Set<String> deletedPathnames = new HashSet<String>();
+
       try {
-        FileStatus[] historyFiles = doneDirFs.listStatus(done);
-        if (historyFiles != null) {
-          for (FileStatus f : historyFiles) {
-            if (now - f.getModificationTime() > maxAgeOfHistoryFiles) {
-              doneDirFs.delete(f.getPath(), true); 
-              LOG.info("Deleting old history file : " + f.getPath());
+        Path[] datedDirectories
+          = FileUtil.stat2Paths(localGlobber(doneDirFs, done,
+                                             DONE_BEFORE_SERIAL_TAIL, null));
+
+        // fild old directories
+        for (int i = 0; i < datedDirectories.length; ++i) {
+          String thisDir = datedDirectories[i].toString();
+          Matcher pathMatcher = historyCleanerParseDirectory.matcher(thisDir);
+
+          if (pathMatcher.matches()) {
+            long dirTime = directoryTime(pathMatcher.group(1),
+                                         pathMatcher.group(2),
+                                         pathMatcher.group(3),
+                                         pathMatcher.group(4));
+
+            if (DEBUG_MODE) {
+              System.err.println("HistoryCleaner.run just parsed " + thisDir
+                                 + " as year/month/day/hour = "
+                               + pathMatcher.group(1)
+                               + "/" + pathMatcher.group(2)
+                               + "/" + pathMatcher.group(3)
+                               + "/" + pathMatcher.group(4));
+            }
+
+            if (dirTime < now - DIRECTORY_LIFE_IN_MS) {
+              if (DEBUG_MODE) {
+                Calendar then = Calendar.getInstance();
+                then.setTimeInMillis(dirTime);
+                Calendar nnow = Calendar.getInstance();
+                nnow.setTimeInMillis(now);
+                
+                System.err.println("HistoryCleaner.run directory: " + thisDir
+                                   + " because its time is " + then
+                                   + " but it's now " + nnow);
+                System.err.println("then = " + dirTime);
+                System.err.println("now  = " + now);
+              }
+
+              // remove every file in the directory and save the name
+              // so we can remove it from jobHistoryFileMap
+              Path[] deletees
+                = FileUtil.stat2Paths(localGlobber(doneDirFs,
+                                                   datedDirectories[i],
+                                                   "/*", // individual files
+                                                   null));
+
+              for (int j = 0; j < deletees.length; ++j) {
+
+                if (DEBUG_MODE && !printedOneDeletee) {
+                  System.err.println("HistoryCleaner.run deletee: " + deletees[j].toString());
+                  printedOneDeletee = true;
+                }
+
+                LOG.info("Deleting old history file : " + deletees[j]);
+
+                doneDirFs.delete(deletees[j]);
+                deletedPathnames.add(deletees[j].toString());
+              }
+
+              synchronized (existingDoneSubdirs) {
+                if (!existingDoneSubdirs.contains(datedDirectories[i]))
+                  {
+                    LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
+                             + datedDirectories[i] + ", but should.");
+                  }
+                doneDirFs.delete(datedDirectories[i], true);
+                existingDoneSubdirs.remove(datedDirectories[i]);
+              }
             }
           }
         }
+
         //walking over the map to purge entries from jobHistoryFileMap
         synchronized (jobHistoryFileMap) {
           Iterator<Entry<JobID, MovedFileInfo>> it = 
             jobHistoryFileMap.entrySet().iterator();
           while (it.hasNext()) {
             MovedFileInfo info = it.next().getValue();
-            if (now - info.timestamp > maxAgeOfHistoryFiles) {
+            if (deletedPathnames.contains(info.historyFile)) {
               it.remove();
-            } else {
-              //since entries are in sorted timestamp order, no more entries
-              //are required to be checked
-              break;
             }
           }
         }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Tue Mar  8 05:53:05 2011
@@ -251,10 +251,15 @@ public class TestJobHistory extends Test
    */
   private static Path getPathForConf(Path path, Path dir) {
     String parts[] = path.getName().split("_");
-    //TODO this is a hack :(
-    // jobtracker-hostname_jobtracker-identifier_
+    Path parent = path.getParent();
+    Path ancestor = parent;
+    for (int i = 0; i < 4; ++i) { // serial #, 3 laysers of date
+      ancestor = ancestor.getParent();
+    }
+    String jobtrackerID = ancestor.getName();
     String id = parts[0] + "_" + parts[1] + "_" + parts[2];
-    return new Path(dir, id + "_conf.xml");
+    String jobUniqueString = jobtrackerID +  id;
+    return new Path(parent, jobUniqueString + "_conf.xml");
   }
 
   /**
@@ -279,13 +284,14 @@ public class TestJobHistory extends Test
    * @param id job id
    * @param conf job conf
    */
-  public static void validateJobHistoryFileFormat(JobHistory jobHistory,
-      JobID id, JobConf conf,
-                 String status, boolean splitsCanBeEmpty) throws IOException  {
+  public static void validateJobHistoryFileFormat
+         (JobTracker jt, JobHistory jobHistory, JobID id, JobConf conf,
+          String status, boolean splitsCanBeEmpty)
+       throws IOException  {
 
     // Get the history file name
     Path dir = jobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(jobHistory, conf, id, dir);
+    String logFileName = getDoneFile(jt, conf, id, dir);
 
     // Framework history log file location
     Path logFile = new Path(dir, logFileName);
@@ -565,11 +571,12 @@ public class TestJobHistory extends Test
                               RunningJob job, JobConf conf) throws IOException  {
 
     JobID id = job.getID();
-    JobHistory jobHistory = 
-      mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    JobHistory jobHistory = jt.getJobHistory();
+
     Path doneDir = jobHistory.getCompletedJobHistoryLocation();
     // Get the history file name
-    String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+    String logFileName = getDoneFile(jt, conf, id, doneDir);
 
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
@@ -646,12 +653,14 @@ public class TestJobHistory extends Test
       assertEquals("Files in logDir did not move to DONE folder",
           0, logDirFs.listStatus(logDirPath).length);
 
-      JobHistory jobHistory = 
-        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      JobHistory jobHistory = jt.getJobHistory();
+
       Path doneDir = jobHistory.getCompletedJobHistoryLocation();
 
-      assertEquals("Files in DONE dir not correct",
-          2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
+      FileStatus[] movedFiles = jobHistory.getAllHistoryConfFiles();
+
+      assertEquals("Files in DONE dir not correct", 2, movedFiles.length);
 
       // run the TCs
       conf = mr.createJobConf();
@@ -676,31 +685,28 @@ public class TestJobHistory extends Test
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.getName());
       JobID id = job.getID();
-      String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+      String logFileName = getDoneFile(jt, conf, id, doneDir);
 
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
       FileSystem fileSys = logFile.getFileSystem(conf);
-
-      Cluster cluster = new Cluster(conf);
-      assertEquals("Client returned wrong history url", logFile.toString(), 
-          cluster.getJobHistoryUrl(id));
    
       // Check if the history file exists
       assertTrue("History file does not exist", fileSys.exists(logFile));
 
       // check if the corresponding conf file exists
-      Path confFile = getPathForConf(logFile, doneDir);
+      String confname = jobHistory.getConfFilePath(id);
+      Path confFile = new Path(confname);
       assertTrue("Config for completed jobs doesnt exist", 
                  fileSys.exists(confFile));
 
-      // check if the file exists in a done folder
-      assertTrue("Completed job config doesnt exist in the done folder", 
-                 doneDir.getName().equals(confFile.getParent().getName()));
-
-      // check if the file exists in a done folder
-      assertTrue("Completed jobs doesnt exist in the done folder", 
-                 doneDir.getName().equals(logFile.getParent().getName()));
+      // check if the conf and log files are in the same directory
+      assertTrue("config file and log file aren't in the same directory",
+                 confFile.getParent().equals(logFile.getParent()));
+
+      // check if the file exists under the done folder
+      assertTrue("Completed job doesnt exist under done folder", 
+                 logFile.toString().startsWith(doneDir.toString()));
       
 
       // check if the job file is removed from the history location 
@@ -714,7 +720,7 @@ public class TestJobHistory extends Test
       assertFalse("Config for completed jobs not deleted from running folder", 
                   fileSys.exists(runningJobConfFilename));
 
-      validateJobHistoryFileFormat(jobHistory,
+      validateJobHistoryFileFormat(jt, jobHistory,
           job.getID(), conf, "SUCCEEDED", false);
       validateJobHistoryFileContent(mr, job, conf);
 
@@ -771,33 +777,39 @@ public class TestJobHistory extends Test
 
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-      JobHistory jobHistory = 
-        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      JobHistory jobHistory = jt.getJobHistory();
+
       Path doneDir = jobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.toString());
       JobID id = job.getID();
-      String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+      String logFileName = getDoneFile(jt, conf, id, doneDir);
 
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
+      Path logDir = logFile.getParent();
       FileSystem fileSys = logFile.getFileSystem(conf);
    
       // Check if the history file exists
       assertTrue("History file does not exist", fileSys.exists(logFile));
 
       // check if the corresponding conf file exists
-      Path confFile = getPathForConf(logFile, doneDir);
+      Path confFile = new Path(jobHistory.getConfFilePath(id));
       assertTrue("Config for completed jobs doesnt exist", 
                  fileSys.exists(confFile));
 
       // check if the conf file exists in a done folder
       assertTrue("Completed job config doesnt exist in the done folder", 
-                 doneDir.getName().equals(confFile.getParent().getName()));
+                 logDir.getName().equals(confFile.getParent().getName()));
 
       // check if the file exists in a done folder
       assertTrue("Completed jobs doesnt exist in the done folder", 
-                 doneDir.getName().equals(logFile.getParent().getName()));
+                 logDir.getName().equals(logFile.getParent().getName()));
+
+      assertTrue("The log file dir is not under the done dir",
+                 logDir.toString().startsWith(doneDir.toString()));
 
       // check if the job file is removed from the history location 
       Path runningJobsHistoryFolder = logFile.getParent().getParent();
@@ -810,12 +822,11 @@ public class TestJobHistory extends Test
       assertFalse("Config for completed jobs not deleted from running folder", 
                   fileSys.exists(runningJobConfFilename));
 
-      validateJobHistoryFileFormat(jobHistory, job.getID(), conf, 
+      validateJobHistoryFileFormat(jt, jobHistory, job.getID(), conf, 
           "SUCCEEDED", false);
       validateJobHistoryFileContent(mr, job, conf);
 
       // get the job conf filename
-      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
       String name = jt.getLocalJobFilePath(job.getID());
       File file = new File(name);
 
@@ -834,16 +845,22 @@ public class TestJobHistory extends Test
 
   //Returns the file in the done folder
   //Waits for sometime to get the file moved to done
-  private static String getDoneFile(JobHistory jobHistory, 
-      JobConf conf, JobID id, 
-      Path doneDir) throws IOException {
+  private static String getDoneFile
+        (JobTracker jt, JobConf conf, JobID id, Path doneDir)
+      throws IOException {
+    JobHistory jobHistory = jt.getJobHistory();
+
     String name = null;
     String user = UserGroupInformation.getCurrentUser().getUserName();
+
     for (int i = 0; name == null && i < 20; i++) {
-      Path path = JobHistory.getJobHistoryFile(
-          jobHistory.getCompletedJobHistoryLocation(), id, user);
-      if (path.getFileSystem(conf).exists(path)) {
-        name = path.toString();
+      String pathname = jobHistory.getHistoryFilePath(id);
+
+      if (pathname != null) {
+        Path path = new Path(pathname);
+        if (path.getFileSystem(conf).exists(path)) {
+          name = path.toString();
+        }
       }
       UtilsForTests.waitFor(1000);
     }
@@ -870,12 +887,14 @@ public class TestJobHistory extends Test
    * @param id job id
    * @param conf job conf
    */
-  private static void validateJobHistoryJobStatus(JobHistory jobHistory,
-      JobID id, JobConf conf, String status) throws IOException  {
+  private static void validateJobHistoryJobStatus
+        (JobTracker jt, JobHistory jobHistory,
+         JobID id, JobConf conf, String status)
+      throws IOException  {
 
     // Get the history file name
     Path doneDir = jobHistory.getCompletedJobHistoryLocation();
-    String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+    String logFileName = getDoneFile(jt, conf, id, doneDir);
     
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
@@ -891,8 +910,7 @@ public class TestJobHistory extends Test
     
     JobHistoryParser parser = new JobHistoryParser(fileSys, 
         logFile.toUri().getPath());
-    JobHistoryParser.JobInfo jobInfo = parser.parse();
-    
+    JobHistoryParser.JobInfo jobInfo = parser.parse();    
 
     assertTrue("Job Status read from job history file is not the expected" +
          " status", status.equals(jobInfo.getJobStatus()));
@@ -918,22 +936,24 @@ public class TestJobHistory extends Test
       // Run a job that will be succeeded and validate its job status
       // existing in history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
       
-      JobHistory jobHistory = 
-        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
-      validateJobHistoryJobStatus(jobHistory, job.getID(), conf, 
+      JobHistory jobHistory = jt.getJobHistory();
+
+      validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf, 
           JobStatus.getJobRunState(JobStatus.SUCCEEDED));
       
       // Run a job that will be failed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobFail(conf, inDir, outDir);
-      validateJobHistoryJobStatus(jobHistory, job.getID(), conf, 
+      validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf, 
           JobStatus.getJobRunState(JobStatus.FAILED));
       
       // Run a job that will be killed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
-      validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+      validateJobHistoryJobStatus(jt, jobHistory, job.getID(), conf,
           JobStatus.getJobRunState(JobStatus.KILLED));
       
     } finally {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java Tue Mar  8 05:53:05 2011
@@ -109,7 +109,7 @@ public class TestJobHistoryParsing  exte
     assertFalse("Writing an event after closing event writer is not handled",
         caughtException);
 
-    String historyFileName = jobId.toString() + "_" + username;
+    String historyFileName = jobId.toString();
     Path historyFilePath = new Path (historyDir.toString(),
       historyFileName);
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Tue Mar  8 05:53:05 2011
@@ -295,11 +295,11 @@ public class TestSeveral extends TestCas
       public Void run() throws IOException {
         verifyOutput(outDir.getFileSystem(conf), outDir);
 
+        JobTracker jt = mrCluster.getJobTrackerRunner().getJobTracker();
 
         //TestJobHistory
-        TestJobHistory.validateJobHistoryFileFormat(
-            mrCluster.getJobTrackerRunner().getJobTracker().getJobHistory(),
-            jobId, conf, "SUCCEEDED", false);
+        TestJobHistory.validateJobHistoryFileFormat
+          (jt, jt.getJobHistory(), jobId, conf, "SUCCEEDED", false);
 
         TestJobHistory.validateJobHistoryFileContent(mrCluster, job, conf);
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Mar  8 05:53:05 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryJobRecord;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
@@ -266,7 +269,7 @@ public class TestRumenJobTraces {
             .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
     
     // Check if jobhistory filename are detected properly
-    Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid, user);
+    Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid);
     JobID extractedJID = 
       JobID.forName(TraceBuilder.extractJobID(jhFilename.getName()));
     assertEquals("TraceBuilder failed to parse the current JH filename", 
@@ -366,6 +369,9 @@ public class TestRumenJobTraces {
     conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
     MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null, 
                                                 new JobConf(conf));
+    JobTracker tracker = mrCluster.getJobTrackerRunner().getJobTracker();
+    JobHistory history = tracker.getJobHistory();
+    
     
     // run a job
     Path inDir = new Path(tempDir, "input");
@@ -395,16 +401,20 @@ public class TestRumenJobTraces {
       Path jhPath = 
         new Path(mrCluster.getJobTrackerRunner().getJobTracker()
                           .getJobHistoryDir());
-      Path inputPath = JobHistory.getJobHistoryFile(jhPath, id, user);
+      Path inputPath = null;
       // wait for 10 secs for the jobhistory file to move into the done folder
       for (int i = 0; i < 100; ++i) {
-        if (lfs.exists(inputPath)) {
+        JobHistoryRecordRetriever retriever 
+          = history.getMatchingJobs(null, "", null, id.toString());
+        if (retriever.hasNext()) {
+          inputPath = retriever.next().getPath();
           break;
         }
-        TimeUnit.MILLISECONDS.wait(100);
+        TimeUnit.MILLISECONDS.sleep(100);
       }
     
-      assertTrue("Missing job history file", lfs.exists(inputPath));
+      assertTrue("Missing job history file",
+                 inputPath != null && lfs.exists(inputPath));
 
       ris = getRewindableInputStream(inputPath, conf);
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Tue Mar  8 05:53:05 2011
@@ -172,13 +172,11 @@ public class TraceBuilder extends Config
    *         [especially for .crc files] we return null.
    */
   static String extractJobID(String fileName) {
+    String pre21JobID
+      = applyParser(fileName, 
+                    Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX);
     String jobId = applyParser(fileName, JobHistory.JOBHISTORY_FILENAME_REGEX);
-    if (jobId == null) {
-      // check if its a pre21 jobhistory file
-      jobId = applyParser(fileName, 
-                          Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX);
-    }
-    return jobId;
+    return jobId == null ? pre21JobID : jobId;
   }
 
   static boolean isJobConfXml(String fileName, InputStream input) {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp?rev=1079184&r1=1079183&r2=1079184&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobhistory.jsp Tue Mar  8 05:53:05 2011
@@ -21,6 +21,8 @@
   contentType="text/html; charset=UTF-8"
   import="java.io.*"
   import="java.util.*"
+  import="java.util.regex.Matcher"
+  import="java.util.regex.Pattern"
   import="java.net.URLEncoder"
   import="org.apache.hadoop.mapred.*"
   import="org.apache.hadoop.util.*"
@@ -30,6 +32,8 @@
   import="org.apache.hadoop.http.HtmlQuoting"
   import="org.apache.hadoop.mapred.*"
   import="org.apache.hadoop.mapreduce.jobhistory.*"
+  import="org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryJobRecord"
+  import="org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever"
 %>
 
 <%!	private static final long serialVersionUID = 1L;
@@ -70,55 +74,68 @@ window.location.href = url;
      <a href="jobhistory.jsp">History Viewer</a></h1>
 <hr>
 <%
+  //{{ // this is here to make indentation work, and must be commented out
     final String search = (request.getParameter("search") == null)
                           ? ""
                           : request.getParameter("search");
 
-    String parts[] = search.split(":");
+    String soughtDate = "";
+    String soughtJobName = "";
+    String soughtJobid = "";
 
-    final String user = (parts.length >= 1)
-                        ? parts[0].toLowerCase()
+    // soughtUser : jobid ; jobname ! date
+
+    String splitDate[] = search.split("!");
+
+    final String DATE_PATTERN = "([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])?[0-9][0-9])";
+
+    if (splitDate.length >= 2) {
+      soughtDate = splitDate[1];
+    }
+
+    String[] splitJobName = splitDate[0].split(";");
+
+    if (splitJobName.length >= 2) {
+      soughtJobName = splitJobName[1];
+    }
+
+    String[] splitJobid = splitJobName[0].split(":");
+
+    if (splitJobid.length >= 2) {
+      soughtJobid = splitJobid[1];
+    }
+
+    final String soughtUser = (splitJobid.length >= 1)
+                        ? splitJobid[0].toLowerCase()
                         : "";
-    final String jobid = (parts.length >= 2)
-                           ? parts[1].toLowerCase()
-                           : "";
-    final String rawUser = HtmlQuoting.unquoteHtmlChars(user);
-    final String rawJobid = HtmlQuoting.unquoteHtmlChars(jobid);
-
-    PathFilter jobLogFileFilter = new PathFilter() {
-      private boolean matchUser(String fileName) {
-        // return true if 
-        //  - user is not specified
-        //  - user matches
-        return "".equals(rawUser) || rawUser.equals(fileName.split("_")[3]);
-      }
 
-      private boolean matchJobId(String fileName) {
-        // return true if 
-        //  - jobid is not specified
-        //  - jobid matches 
-        String[] jobDetails = fileName.split("_");
-        String actualId = jobDetails[0] + "_" +jobDetails[1] + "_" + jobDetails[2] ;
-        return "".equals(rawJobid) || jobid.equalsIgnoreCase(actualId);
+    if (soughtDate.length() != 0) {
+      Pattern p = Pattern.compile(DATE_PATTERN);
+      Matcher m = p.matcher(soughtDate);
+      if (!m.matches()) {
+        soughtDate = "";
       }
+    }
 
-      public boolean accept(Path path) {
-        return (!(path.getName().endsWith(".xml") || 
-          path.getName().endsWith(JobHistory.OLD_SUFFIX)) && 
-          matchUser(path.getName()) && matchJobId(path.getName()));
-      }
-    };
+    JobHistory jobHistory = (JobHistory) application.getAttribute("jobHistoryHistory");
+    String soughtDates[] = new String[1];
+    soughtDates[0] = soughtDate;
+
+    JobHistoryRecordRetriever retriever
+      = jobHistory.getMatchingJobs
+           (soughtUser, soughtJobName, soughtDates, soughtJobid);
     
-    FileSystem fs = (FileSystem) application.getAttribute("fileSys");
-    String historyLogDir = (String) application.getAttribute("historyLogDir");
-    if (fs == null) {
-      out.println("Null file system. May be namenode is in safemode!");
-      return;
-    }
-    Path[] jobFiles = FileUtil.stat2Paths(fs.listStatus(new Path(historyLogDir),
-                                          jobLogFileFilter));
-    out.println("<!--  user : " + user + ", jobid : " + jobid + "-->");
-    if (null == jobFiles || jobFiles.length == 0)  {
+    JobHistoryJobRecord[] records
+      = new JobHistoryJobRecord[retriever.numberMatches()];
+
+    int recordsIndex = 0;
+
+    while (retriever.hasNext()) {
+      records[recordsIndex++] = retriever.next();
+    }
+    
+    out.println("<!--  user : " + soughtUser + ", jobid : " + soughtJobid + "-->");
+    if (records.length == 0)  {
       out.println("No files found!"); 
       return ; 
     }
@@ -132,15 +149,15 @@ window.location.href = url;
     int size = 100;
 
     // if show-all is requested or jobfiles < size(100)
-    if (pageno == -1 || size > jobFiles.length) {
-      size = jobFiles.length;
+    if (pageno == -1 || size > records.length) {
+      size = records.length;
     }
 
     if (pageno == -1) { // special case 'show all'
       pageno = 1;
     }
 
-    int maxPageNo = (int)Math.ceil((float)jobFiles.length / size);
+    int maxPageNo = (records.length + size - 1) / size;
 
     // check and fix pageno
     if (pageno < 1 || pageno > maxPageNo) {
@@ -152,15 +169,15 @@ window.location.href = url;
     if (pageno == maxPageNo) {
       // find the number of files to be shown on the last page
       int startOnLast = ((pageno - 1) * size) + 1;
-      length = jobFiles.length - startOnLast + 1;
+      length = records.length - startOnLast + 1;
     }
 
     // Display the search box
-    out.println("<form name=search><b> Filter (username:jobid) </b>"); // heading
+    out.println("<form name=search><b> Filter ([username][:jobid][;jobname-key][!MM/DD/YYYY]) </b>"); // heading
     out.println("<input type=text name=search size=\"20\" value=\"" + search + "\">"); // search box
     out.println("<input type=submit value=\"Filter!\" onClick=\"showUserHistory(document.getElementById('search').value)\"></form>");
-    out.println("<span class=\"small\">Example: 'smith' will display jobs submitted by user 'smith'. </span>");
-    out.println("<span class=\"small\">Job Ids need to be prefixed with a colon(:) For example, :job_200908311030_0001 will display the job with that id. </span>"); // example 
+    out.println("<span class=\"small\">Example: <b>smith</b> will display jobs submitted by user 'smith'. </span>");
+    out.println("<span class=\"small\">Job Ids need to be prefixed with a colon(:) For example, <b>:job_200908311030_0001</b> will display the job with that id.  You may search for parts of job names.  Job name search keys need to be prefixed by a semicolon (;).  A filter <b>;budget</b> will find jobs named \"budget calculation\" or \"fussbudget job\".  You may restrict results to jobs that finished on a specific day.  Date criteria are <b>MM/DD/YYYYY</b> and are prefixed with an exclamation point (!).  You may specify multiple criteria.  We only display jobs that satisfy all criteria.</span>"); // example 
     out.println("<hr>");
 
     //Show the status
@@ -171,12 +188,15 @@ window.location.href = url;
 
     out.println("<font size=5><b>Available Jobs in History </b></font>");
     // display the number of jobs, start index, end index
-    out.println("(<i> <span class=\"small\">Displaying <b>" + length + "</b> jobs from <b>" + start + "</b> to <b>" + (start + length - 1) + "</b> out of <b>" + jobFiles.length + "</b> jobs");
-    if (!"".equals(user)) {
-      out.println(" for user <b>" + HtmlQuoting.quoteHtmlChars(user) + "</b>"); // show the user if present
+    out.println("(<i> <span class=\"small\">Displaying <b>" + length + "</b> jobs from <b>" + start + "</b> to <b>" + (start + length - 1) + "</b> out of <b>" + records.length + "</b> jobs");
+    if (!"".equals(soughtUser)) {
+      out.println(" for user <b>" + soughtUser + "</b>"); // show the user if present
     }
-    if (!"".equals(jobid)) {
-      out.println(" for jobid <b>" + HtmlQuoting.quoteHtmlChars(jobid) + "</b> in it."); // show the jobid keyword if present
+    if (!"".equals(soughtJobid)) {
+      out.println(" for jobid <b>" + soughtJobid + "</b> in it "); // show the jobid keyword if present
+    }
+    if (!"".equals(soughtDate)) {
+      out.println(" for date <b>" + soughtDate + "</b>"); // show the jobid keyword if present
     }
     out.print("</span></i>)");
 
@@ -197,28 +217,27 @@ window.location.href = url;
       out.println("<span class=\"small\">[last page]</span>");
     }
 
-    // sort the files on creation time.
-    Arrays.sort(jobFiles, new Comparator<Path>() {
-      public int compare(Path p1, Path p2) {
-        String dp1 = null;
-        String dp2 = null;
-        
-        dp1 = p1.getName();
-        dp2 = p2.getName();
-                
-        String[] split1 = dp1.split("_");
-        String[] split2 = dp2.split("_");
+    // REVERSE sort the files on creation time.
+    Arrays.sort(records, new Comparator<JobHistoryJobRecord>() {
+                  public int compare(JobHistoryJobRecord rec1, JobHistoryJobRecord rec2) {
+                    String id1 = rec1.getJobIDString(true);
+                    String id2 = rec2.getJobIDString(true);
+
+                    String[] idsplit1 = id1.split("_");
+                    String[] idsplit2 = id2.split("_");
         
-        // compare job tracker start time
-        int res = new Date(Long.parseLong(split1[1])).compareTo(
-                             new Date(Long.parseLong(split2[1])));
-        if (res == 0) {
-          Long l1 = Long.parseLong(split1[2]);
-          res = l1.compareTo(Long.parseLong(split2[2]));
-        }
-        return res;
-      }
-    });
+                    // compare job tracker start time
+                    Long jtTime2 = Long.parseLong(idsplit2[1]);
+                    // comparison sense is reversed
+                    int res = jtTime2.compareTo(Long.parseLong(idsplit1[1]));
+                    if (res == 0) {
+                      // comparison sense is reversed
+                      Long sn2 = Long.parseLong(idsplit2[2]);
+                      res = sn2.compareTo(Long.parseLong(idsplit1[2]));
+                    }
+                    return res;
+                  }
+                });
 
     out.println("<br><br>");
 
@@ -227,15 +246,21 @@ window.location.href = url;
 
     out.print("<table align=center border=2 cellpadding=\"5\" cellspacing=\"2\">");
     out.print("<tr>");
-    out.print( "<td>Job Id</td><td>User</td>") ; 
+    out.print( "<td>Job submit time</td>");
+    out.print("<td>Job Id</td>");
+    out.print("<td>User</td>") ; 
+    out.print("<td>Job Name</td>") ; 
     out.print("</tr>"); 
     
     Set<String> displayedJobs = new HashSet<String>();
     for (int i = start - 1; i < start + length - 1; ++i) {
-      Path jobFile = jobFiles[i];
+      JobHistoryJobRecord record = records[i];
 
-      String jobId = JobHistory.getJobIDFromHistoryFilePath(jobFile).toString();
-      String userName = JobHistory.getUserFromHistoryFilePath(jobFile);
+      String jobId = record.getJobIDString();
+      String userName = record.getUserName();
+      long submitTime = record.getSubmitTime();
+      String jobName = record.getJobName();
+      Path logPath = record.getPath();
 
       // Check if the job is already displayed. There can be multiple job 
       // history files for jobs that have restarted
@@ -248,8 +273,7 @@ window.location.href = url;
 %>
 <center>
 <%	
-      printJob(jobId, userName, new Path(jobFile.getParent(), jobFile), 
-               out) ; 
+      printJob(submitTime, jobId, userName, logPath, jobName, out) ; 
 %>
 </center> 
 <%
@@ -260,17 +284,39 @@ window.location.href = url;
     printNavigation(pageno, size, maxPageNo, search, out);
 %>
 <%!
-    private void printJob(String jobId, 
-                          String user, Path logFile, JspWriter out)
+    private void printJob(long timestamp, String jobId, 
+                          String user, Path logFile, String jobName, JspWriter out)
     throws IOException {
       out.print("<tr>"); 
+      out.print("<td>" + new Date(timestamp) + "</td>"); 
       out.print("<td>" + "<a href=\"jobdetailshistory.jsp?logFile=" +
        URLEncoder.encode(logFile.toString(), "UTF-8") +
-                "\">" + HtmlQuoting.quoteHtmlChars(jobId) + "</a></td>");
-      out.print("<td>" + HtmlQuoting.quoteHtmlChars(user) + "</td>");
+                "\">" + jobId + "</a></td>");
+      out.print("<td>" + user + "</td>");
+      out.print("<td>" + jobName + "</td>");
       out.print("</tr>");
     }
 
+
+    // I tolerate this code because I expect a low number of
+    // occurrences in a relatively short string
+    private static String replaceStringInstances
+      (String replacee, String old, String replacement) {
+      int index = replacee.indexOf(old);
+
+      while (index > 0) {
+        replacee = (replacee.substring(0, index)
+                       + replacement
+                       + replaceStringInstances
+                           (replacee.substring(index + old.length()),
+                            old, replacement));
+
+        index = replacee.indexOf(old);
+      }
+
+      return replacee;
+    }   
+
     private void printNavigation(int pageno, int size, int max, String search, 
                                  JspWriter out) throws IOException {
       int numIndexToShow = 5; // num indexes to show on either side