You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/10 11:44:28 UTC

svn commit: r1101385 [1/2] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job...

Author: sharad
Date: Tue May 10 09:44:27 2011
New Revision: 1101385

URL: http://svn.apache.org/viewvc?rev=1101385&view=rev
Log:
MAPREDUCE-2478. Improve history server. Contributed by Siddharth Seth.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
Removed:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/JobHistoryUtils.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YARNApplicationConstants.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/resources/yarn-default.xml

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 10 09:44:27 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    MAPREDUCE-2478. Improve history server. (Siddharth Seth via sharad)
+
     Fix Null Pointer in TestUberAM. (sharad) 
 
     Fix refreshProxy in ClientServiceDelegate. (sharad)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue May 10 09:44:27 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.jobh
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -30,7 +29,6 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +39,9 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
-import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -96,7 +96,7 @@ public class JobHistoryEventHandler exte
     this.conf = conf;
 
     String logDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(conf);
-    String  doneDirPrefix = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+    String  doneDirPrefix = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
 
     try {
       doneDirPrefixPath = FileSystem.get(conf).makeQualified(
@@ -157,6 +157,7 @@ public class JobHistoryEventHandler exte
 
   @Override
   public void stop() {
+    LOG.info("Stopping JobHistoryEventHandler");
     stopped = true;
     //do not interrupt while event handling is in progress
     synchronized(this) {
@@ -184,7 +185,7 @@ public class JobHistoryEventHandler exte
         LOG.info("Exception while closing file " + e.getMessage());
       }
     }
-
+    LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.stop();
   }
 
@@ -199,18 +200,18 @@ public class JobHistoryEventHandler exte
   protected void setupEventWriter(JobId jobId)
   throws IOException {
     if (logDirPath == null) {
-      LOG.info("Log Directory is null, returning");
+      LOG.error("Log Directory is null, returning");
       throw new IOException("Missing Log Directory for History");
     }
 
     MetaInfo oldFi = fileMap.get(jobId);
     Configuration conf = getConfig();
 
-    long submitTime = (oldFi == null ? context.getClock().getTime() : oldFi.submitTime);
+    long submitTime = (oldFi == null ? context.getClock().getTime() : oldFi.getJobIndexInfo().getSubmitTime());
 
     // String user = conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
     
-    Path logFile = JobHistoryUtils.getJobHistoryFile(logDirPath, jobId, startCount);
+    Path logFile = JobHistoryUtils.getStagingJobHistoryFile(logDirPath, jobId, startCount);
     String user = conf.get(MRJobConfig.USER_NAME);
     if (user == null) {
       throw new IOException("User is null while setting up jobhistory eventwriter" );
@@ -223,6 +224,7 @@ public class JobHistoryEventHandler exte
         FSDataOutputStream out = logDirFS.create(logFile, true);
         //TODO Permissions for the history file?
         writer = new EventWriter(out);
+        LOG.info("Event Writer setup for JobId: " + jobId + ", File: " + logFile);
       } catch (IOException ioe) {
         LOG.info("Could not create log file: [" + logFile + "] + for job " + "[" + jobName + "]");
         throw ioe;
@@ -232,8 +234,7 @@ public class JobHistoryEventHandler exte
     //This could be done at the end as well in moveToDone
     Path logDirConfPath = null;
     if (conf != null) {
-      logDirConfPath = getConfFile(logDirPath, jobId);
-      LOG.info("XXX: Attempting to write config to: " + logDirConfPath);
+      logDirConfPath = JobHistoryUtils.getStagingConfFile(logDirPath, jobId, startCount);
       FSDataOutputStream jobFileOut = null;
       try {
         if (logDirConfPath != null) {
@@ -247,7 +248,7 @@ public class JobHistoryEventHandler exte
       }
     }
     
-    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName);
+    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName, jobId);
     fileMap.put(jobId, fi);
   }
 
@@ -261,7 +262,7 @@ public class JobHistoryEventHandler exte
       }
       
     } catch (IOException e) {
-      LOG.info("Error closing writer for JobID: " + id);
+      LOG.error("Error closing writer for JobID: " + id);
       throw e;
     }
   }
@@ -277,6 +278,7 @@ public class JobHistoryEventHandler exte
 
   protected synchronized void handleEvent(JobHistoryEvent event) {
     // check for first event from a job
+    //TODO Log a meta line with version information.
     if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
       try {
         setupEventWriter(event.getJobID());
@@ -286,7 +288,6 @@ public class JobHistoryEventHandler exte
       }
     }
     MetaInfo mi = fileMap.get(event.getJobID());
-    EventWriter writer = fileMap.get(event.getJobID()).writer;
     try {
       HistoryEvent historyEvent = event.getHistoryEvent();
       mi.writeEvent(historyEvent);
@@ -298,6 +299,10 @@ public class JobHistoryEventHandler exte
     // check for done
     if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
       try {
+        JobFinishedEvent jFinishedEvent = (JobFinishedEvent)event.getHistoryEvent();
+        mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
+        mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
+        mi.getJobIndexInfo().setNumReduces(jFinishedEvent.getFinishedReduces());
         closeEventWriter(event.getJobID());
       } catch (IOException e) {
         throw new YarnException(e);
@@ -308,46 +313,57 @@ public class JobHistoryEventHandler exte
   protected void closeEventWriter(JobId jobId) throws IOException {
     final MetaInfo mi = fileMap.get(jobId);
     
+    if (mi == null) {
+      throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
+    }
     try {
-      if (mi != null) {
         mi.closeWriter();
+    } catch (IOException e) {
+      LOG.error("Error closing writer for JobID: " + jobId);
+      throw e;
       }
      
-      if (mi == null || mi.getHistoryFile() == null) {
-        LOG.info("No file for job-history with " + jobId + " found in cache!");
+    if (mi.getHistoryFile() == null) {
+      LOG.warn("No file for job-history with " + jobId + " found in cache!");
       }
       if (mi.getConfFile() == null) {
-        LOG.info("No file for jobconf with " + jobId + " found in cache!");
+      LOG.warn("No file for jobconf with " + jobId + " found in cache!");
       }
       
-      
-      //TODO fix - add indexed structure 
-      // 
-      String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath.toString());
-      Path doneDirPath =
-    	  doneDirFS.makeQualified(new Path(doneDir));
-      if (!pathExists(doneDirFS, doneDirPath)) {
+    String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath
+        .toString());
+    Path doneDirPath = doneDirFS.makeQualified(new Path(doneDir));
         try {
-          doneDirFS.mkdirs(doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION));
-        } catch (FileAlreadyExistsException e) {
-          LOG.info("Done directory: [" + doneDirPath + "] already exists.");
-      }
+      if (!pathExists(doneDirFS, doneDirPath)) {
+        doneDirFS.mkdirs(doneDirPath, new FsPermission(
+            JobHistoryUtils.HISTORY_DIR_PERMISSION));
       }
+
+      if (mi.getHistoryFile() != null) {
       Path logFile = mi.getHistoryFile();
       Path qualifiedLogFile = logDirFS.makeQualified(logFile);
+        String doneJobHistoryFileName = FileNameIndexUtils.getDoneFileName(mi
+            .getJobIndexInfo());
       Path qualifiedDoneFile = doneDirFS.makeQualified(new Path(doneDirPath,
-          getDoneJobHistoryFileName(jobId)));
+            doneJobHistoryFileName));
       moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
+      }
       
+      if (mi.getConfFile() != null) {
       Path confFile = mi.getConfFile();
       Path qualifiedConfFile = logDirFS.makeQualified(confFile);
-      Path qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(doneDirPath, getDoneConfFileName(jobId)));
+        String doneConfFileName = JobHistoryUtils
+            .getIntermediateConfFileName(jobId);
+        Path qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(
+            doneDirPath, doneConfFileName));
       moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
-      
-      logDirFS.delete(qualifiedLogFile, true);
-      logDirFS.delete(qualifiedConfFile, true);
+      }
+      String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobId);
+      Path doneFilePath = doneDirFS.makeQualified(new Path(doneDirPath,
+          doneFileName));
+      touchFile(doneFilePath);
     } catch (IOException e) {
-      LOG.info("Error closing writer for JobID: " + jobId);
+      LOG.error("Error closing writer for JobID: " + jobId);
       throw e;
     }
   }
@@ -356,24 +372,22 @@ public class JobHistoryEventHandler exte
     private Path historyFile;
     private Path confFile;
     private EventWriter writer;
-    long submitTime;
-    String user;
-    String jobName;
+    JobIndexInfo jobIndexInfo;
 
     MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
-             String user, String jobName) {
+             String user, String jobName, JobId jobId) {
       this.historyFile = historyFile;
       this.confFile = conf;
       this.writer = writer;
-      this.submitTime = submitTime;
-      this.user = user;
-      this.jobName = jobName;
+      this.jobIndexInfo = new JobIndexInfo(submitTime, -1, user, jobName, jobId, -1, -1);
     }
 
     Path getHistoryFile() { return historyFile; }
 
     Path getConfFile() {return confFile; } 
 
+    JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
+
     synchronized void closeWriter() throws IOException {
       if (writer != null) {
         writer.close();
@@ -389,30 +403,6 @@ public class JobHistoryEventHandler exte
     }
   }
 
-  //TODO Move some of these functions into a utility class.
-
-
-  private String getDoneJobHistoryFileName(JobId jobId) {
-    return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
-  }
-
-  private String getDoneConfFileName(JobId jobId) {
-    return TypeConverter.fromYarn(jobId).toString() + JobHistoryUtils.CONF_FILE_NAME_SUFFIX;
-  }
-  
-  private Path getConfFile(Path logDir, JobId jobId) {
-    Path jobFilePath = null;
-    if (logDir != null) {
-      jobFilePath = new Path(logDir, TypeConverter.fromYarn(jobId).toString()
-          + "_" + startCount + JobHistoryUtils.CONF_FILE_NAME_SUFFIX);
-    }
-    return jobFilePath;
-  }
-
-
-  //TODO This could be done by the jobHistory server - move files to a temporary location
-  //  which is scanned by the JH server - to move them to the final location.
-  // Currently JHEventHandler is moving files to the final location.
   private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
     //check if path exists, in case of retries it may not exist
     if (logDirFS.exists(fromPath)) {
@@ -430,7 +420,14 @@ public class JobHistoryEventHandler exte
           LOG.info("copy failed");
       doneDirFS.setPermission(toPath,
           new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
+      
+      logDirFS.delete(fromPath, false);
+    }
     }
+
+  private void touchFile(Path path) throws IOException {
+    doneDirFS.createNewFile(path);
+    doneDirFS.setPermission(path, JobHistoryUtils.HISTORY_DIR_PERMISSION);
   }
 
   boolean pathExists(FileSystem fileSys, Path path) throws IOException {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue May 10 09:44:27 2011
@@ -653,8 +653,12 @@ public class JobImpl implements org.apac
     }
   }
 
-  private void finished() {
+  private void setFinishTime() {
     finishTime = clock.getTime();
+  }
+  
+  private void finished() {
+    if (finishTime == 0) setFinishTime();
     eventHandler.handle(new JobFinishEvent(jobId));
   }
 
@@ -1000,6 +1004,18 @@ public class JobImpl implements org.apac
           numReduceTasks, //TODO finishedReduceTasks
           finalState.toString());
     eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
+    
+    JobFinishedEvent jfe =
+      new JobFinishedEvent(TypeConverter.fromYarn(jobId),
+          finishTime,
+          succeededMapTaskCount,
+          succeededReduceTaskCount, failedMapTaskCount,
+          failedReduceTaskCount,
+          TypeConverter.fromYarn(getCounters()), //TODO replace with MapCounter
+          TypeConverter.fromYarn(getCounters()), // TODO reduceCounters
+          TypeConverter.fromYarn(getCounters()));
+    eventHandler.handle(new JobHistoryEvent(jobId, jfe));
+    //TODO Does this require a JobFinishedEvent?
   }
 
   // Task-start has been moved out of InitTransition, so this arc simply
@@ -1125,6 +1141,7 @@ public class JobImpl implements org.apac
         job.allowedMapFailuresPercent*job.numMapTasks ||
         job.failedReduceTaskCount*100 > 
         job.allowedReduceFailuresPercent*job.numReduceTasks) {
+        job.setFinishTime();
         JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(job.jobId),
               job.finishTime,
@@ -1132,18 +1149,8 @@ public class JobImpl implements org.apac
               job.failedReduceTaskCount, //TODO finishedReduceTasks
               org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
+        //TODO This event not likely required - sent via abort(). 
 
-// Adding JobFinishedEvent to dump counters
-        JobFinishedEvent jfe =
-          new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
-              job.finishTime,
-              job.succeededMapTaskCount,
-              job.succeededReduceTaskCount, job.failedMapTaskCount,
-              job.failedReduceTaskCount,
-              TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounter
-              TypeConverter.fromYarn(job.getCounters()), // TODO reduceCounters
-              TypeConverter.fromYarn(job.getCounters()));
-        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe));
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         job.finished();
         return JobState.FAILED;
@@ -1157,6 +1164,7 @@ public class JobImpl implements org.apac
           LOG.warn("Could not do commit for Job", e);
         }
        // Log job-history
+        job.setFinishTime();
         JobFinishedEvent jfe =
         new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
           job.finishTime,
@@ -1226,6 +1234,7 @@ public class JobImpl implements org.apac
     @Override
     protected JobState checkJobForCompletion(JobImpl job) {
       if (job.completedTaskCount == job.tasks.size()) {
+        job.setFinishTime();
         job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
         job.finished();
         return JobState.KILLED;
@@ -1252,6 +1261,7 @@ public class JobImpl implements org.apac
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
+      //TODO JH Event?
       job.finished();
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue May 10 09:44:27 2011
@@ -56,7 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
-import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -164,7 +164,7 @@ public class RecoveryService extends Com
         new Path(jobhistoryDir));
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
         getConfig());
-    historyFile = fc.makeQualified(JobHistoryUtils.getJobHistoryFile(
+    historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
         histDirPath, jobName, startCount - 1));          //read the previous history file
     in = fc.open(historyFile);
     JobHistoryParser parser = new JobHistoryParser(in);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue May 10 09:44:27 2011
@@ -18,18 +18,23 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 
 import junit.framework.Assert;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -38,10 +43,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
-import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
-import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -61,8 +62,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -76,9 +79,14 @@ import org.apache.hadoop.yarn.state.Stat
  */
 public class MRApp extends MRAppMaster {
 
+  private static final Log LOG = LogFactory.getLog(MRApp.class);
+  
   int maps;
   int reduces;
 
+  private File testWorkDir;
+  private Path testAbsPath;
+
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
   
@@ -93,12 +101,25 @@ public class MRApp extends MRAppMaster {
     applicationId.setId(0);
   }
 
-  public MRApp(int maps, int reduces, boolean autoComplete) {
-    this(maps, reduces, autoComplete, 1);
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
   }
 
-  public MRApp(int maps, int reduces, boolean autoComplete, int startCount) {
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) {
     super(applicationId, startCount);
+    this.testWorkDir = new File("target", testName);
+    testAbsPath = new Path(testWorkDir.getAbsolutePath());
+    LOG.info("PathUsed: " + testAbsPath);
+    if (cleanOnStart) {
+      testAbsPath = new Path(testWorkDir.getAbsolutePath());
+      try {
+        FileContext.getLocalFSFileContext().delete(testAbsPath, true);
+      } catch (Exception e) {
+        LOG.warn("COULD NOT CLEANUP: " + testAbsPath, e);
+        throw new YarnException("could not cleanup test dir", e);
+      }
+    }
+    
     this.maps = maps;
     this.reduces = reduces;
     this.autoComplete = autoComplete;
@@ -107,10 +128,8 @@ public class MRApp extends MRAppMaster {
   public Job submit(Configuration conf) throws Exception {
     String user = conf.get(MRJobConfig.USER_NAME, "mapred");
     conf.set(MRJobConfig.USER_NAME, user);
-    conf.set(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
-       "file:///tmp/yarn/");
-    conf.set(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
-       "file:///tmp/yarn/done/");
+    conf.set(YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, testAbsPath.toString());
+
     init(conf);
     start();
     Job job = getContext().getAllJobs().values().iterator().next();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Tue May 10 09:44:27 2011
@@ -76,7 +76,7 @@ public class MRAppBenchmark {
     int maxConcurrentRunningTasks;
     volatile int concurrentRunningTasks;
     ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
-      super(maps, reduces, true);
+      super(maps, reduces, true, "ThrottledMRApp", true);
       this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
     }
     
@@ -156,7 +156,7 @@ public class MRAppBenchmark {
     int reduces = 100;
     System.out.println("Running benchmark with maps:"+maps +
         " reduces:"+reduces);
-    run(new MRApp(maps, reduces, true));
+    run(new MRApp(maps, reduces, true, this.getClass().getName(), true));
   }
 
   public void benchmark2() throws Exception {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue May 10 09:44:27 2011
@@ -164,7 +164,7 @@ public class TestFail {
 
   static class TimeOutTaskMRApp extends MRApp {
     TimeOutTaskMRApp(int maps, int reduces) {
-      super(maps, reduces, false);
+      super(maps, reduces, false, "TimeOutTaskMRApp", true);
     }
     @Override
     protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
@@ -188,7 +188,7 @@ public class TestFail {
   static class MockFirstFailingTaskMRApp extends MRApp {
 
     MockFirstFailingTaskMRApp(int maps, int reduces) {
-      super(maps, reduces, true);
+      super(maps, reduces, true, "MockFirstFailingTaskMRApp", true);
     }
 
     @Override
@@ -209,7 +209,7 @@ public class TestFail {
   //First attempt is failed
   static class MockFirstFailingAttemptMRApp extends MRApp {
     MockFirstFailingAttemptMRApp(int maps, int reduces) {
-      super(maps, reduces, true);
+      super(maps, reduces, true, "MockFirstFailingAttemptMRApp", true);
     }
 
     @Override

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Tue May 10 09:44:27 2011
@@ -43,7 +43,7 @@ public class TestFetchFailure {
 
   @Test
   public void testFetchFailure() throws Exception {
-    MRApp app = new MRApp(1, 1, false);
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true);
     Configuration conf = new Configuration();
     // map -> reduce -> fetch-failure -> map retry is incompatible with
     // sequential, single-task-attempt approach in uber-AM, so disable:

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Tue May 10 09:44:27 2011
@@ -190,7 +190,7 @@ public class TestKill {
   static class BlockingMRApp extends MRApp {
     private CountDownLatch latch;
     BlockingMRApp(int maps, int reduces, CountDownLatch latch) {
-      super(maps, reduces, true);
+      super(maps, reduces, true, "testKill", true);
       this.latch = latch;
     }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue May 10 09:44:27 2011
@@ -44,7 +44,7 @@ public class TestMRApp {
 
   @Test
   public void testMapReduce() throws Exception {
-    MRApp app = new MRApp(2, 2, true);
+    MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true);
     Job job = app.submit(new Configuration());
     app.waitForState(job, JobState.SUCCEEDED);
     app.verifyCompleted();
@@ -52,7 +52,7 @@ public class TestMRApp {
 
   @Test
   public void testCommitPending() throws Exception {
-    MRApp app = new MRApp(1, 0, false);
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
     Job job = app.submit(new Configuration());
     app.waitForState(job, JobState.RUNNING);
     Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
@@ -82,7 +82,7 @@ public class TestMRApp {
 
   @Test
   public void testCompletedMapsForReduceSlowstart() throws Exception {
-    MRApp app = new MRApp(2, 1, false);
+    MRApp app = new MRApp(2, 1, false, this.getClass().getName(), true);
     Configuration conf = new Configuration();
     //after half of the map completion, reduce will start
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
@@ -141,7 +141,7 @@ public class TestMRApp {
 
   @Test
   public void testJobError() throws Exception {
-    MRApp app = new MRApp(1, 0, false);
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
     Job job = app.submit(new Configuration());
     app.waitForState(job, JobState.RUNNING);
     Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Tue May 10 09:44:27 2011
@@ -172,7 +172,7 @@ public class TestMRClientService {
   class MRAppWithClientService extends MRApp {
     MRClientService clientService = null;
     MRAppWithClientService(int maps, int reduces, boolean autoComplete) {
-      super(maps, reduces, autoComplete);
+      super(maps, reduces, autoComplete, "MRAppWithClientService", true);
     }
     @Override
     protected ClientService createClientService(AppContext context) {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue May 10 09:44:27 2011
@@ -44,7 +44,7 @@ public class TestRecovery {
   @Test
   public void testCrashed() throws Exception {
     int runCount = 0;
-    MRApp app = new MRApp(2, 1, false, ++runCount);
+    MRApp app = new MRApp(2, 1, false, this.getClass().getName(), true, ++runCount);
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     Job job = app.submit(conf);
@@ -127,7 +127,7 @@ public class TestRecovery {
     
     //rerun
     //in rerun the 1st map will be recovered from previous run
-    app = new MRApp(2, 1, false, ++runCount);
+    app = new MRApp(2, 1, false, this.getClass().getName(), false, ++runCount);
     conf = new Configuration();
     conf.setBoolean(YarnMRJobConfig.RECOVERY_ENABLE, true);
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Tue May 10 09:44:27 2011
@@ -42,13 +42,66 @@ public class YarnMRJobConfig {
   /** host:port address to which to bind to **/
   public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
 
+  /** Staging Dir for AppMaster **/
   public static final String HISTORY_STAGING_DIR_KEY =
        "yarn.historyfile.stagingDir";
 
+  /** Done Dir for for AppMaster **/
+  public static final String HISTORY_INTERMEDIATE_DONE_DIR_KEY =
+       "yarn.historyfile.intermediateDoneDir";
+  
+  /** Done Dir for for AppMaster **/
   public static final String HISTORY_DONE_DIR_KEY =
        "yarn.historyfile.doneDir";
+  
+  /** Done Dir for history server. **/
+  public static final String HISTORY_SERVER_DONE_DIR_KEY = 
+       HS_PREFIX + ".historyfile.doneDir";
+  
+  /**
+   * Size of the job list cache.
+   */
+  public static final String HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY =
+    HS_PREFIX + ".joblist.cache.size";
+     
+  /**
+   * Size of the loaded job cache.
+   */
+  public static final String HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY = 
+    HS_PREFIX + ".loadedjobs.cache.size";
+  
+  /**
+   * Size of the date string cache. Effects the number of directories
+   * which will be scanned to find a job.
+   */
+  public static final String HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY = 
+    HS_PREFIX + ".datestring.cache.size";
+  
+  /**
+   * The time interval in milliseconds for the history server
+   * to wake up and scan for files to be moved.
+   */
+  public static final String HISTORY_SERVER_MOVE_THREAD_INTERVAL = 
+    HS_PREFIX + ".move.thread.interval";
+  
+  /**
+   * The number of threads used to move files.
+   */
+  public static final String HISTORY_SERVER_NUM_MOVE_THREADS = 
+    HS_PREFIX + ".move.threads.count";
+  
+  // Equivalent to 0.20 mapreduce.jobhistory.debug.mode
+  public static final String HISTORY_DEBUG_MODE_KEY = HS_PREFIX + ".debug.mode";
+  
   public static final String HISTORY_MAXAGE =
 	  "yarn.historyfile.maxage";
+  
+  /**
+   * Run interval for the History Cleaner thread.
+   */
+  public static final String HISTORY_CLEANER_RUN_INTERVAL = 
+    HS_PREFIX + ".cleaner.run.interval";
+  
   public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
       "address.webapp";
   public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Tue May 10 09:44:27 2011
@@ -0,0 +1,226 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class FileNameIndexUtils {
+
+  static final String UNDERSCORE_ESCAPE = "%5F";
+  static final int JOB_NAME_TRIM_LENGTH = 50;
+  
+  //This has to be underscore currently. Untill escape uses DELIMITER.
+  static final String DELIMITER = "_";
+  
+  private static final int JOB_ID_INDEX = 0;
+  private static final int SUBMIT_TIME_INDEX = 1;
+  private static final int USER_INDEX = 2;
+  private static final int JOB_NAME_INDEX = 3;
+  private static final int FINISH_TIME_INDEX = 4;
+  private static final int NUM_MAPS_INDEX = 5;
+  private static final int NUM_REDUCES_INDEX = 6;
+  private static final int MAX_INDEX = NUM_REDUCES_INDEX;
+
+  /**
+   * Constructs the job history file name from the JobIndexInfo.
+   * 
+   * @param indexInfo the index info.
+   * @return the done job history filename.
+   */
+  public static String getDoneFileName(JobIndexInfo indexInfo) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    //JobId
+    sb.append(escapeUnderscores(TypeConverter.fromYarn(indexInfo.getJobId()).toString()));
+    sb.append(DELIMITER);
+    
+    //StartTime
+    sb.append(indexInfo.getSubmitTime());
+    sb.append(DELIMITER);
+    
+    //UserName
+    sb.append(escapeUnderscores(getUserName(indexInfo)));
+    sb.append(DELIMITER);
+    
+    //JobName
+    sb.append(escapeUnderscores(trimJobName(getJobName(indexInfo))));
+    sb.append(DELIMITER);
+    
+    //FinishTime
+    sb.append(indexInfo.getFinishTime());
+    sb.append(DELIMITER);
+    
+    //NumMaps
+    sb.append(indexInfo.getNumMaps());
+    sb.append(DELIMITER);
+    
+    //NumReduces
+    sb.append(indexInfo.getNumReduces());
+    
+    sb.append(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION);
+    return encodeJobHistoryFileName(sb.toString());
+  }
+  
+  /**
+   * Parses the provided job history file name to construct a
+   * JobIndexInfo object which is returned.
+   * 
+   * @param jhFileName the job history filename.
+   * @return a JobIndexInfo object built from the filename.
+   */
+  public static JobIndexInfo getIndexInfo(String jhFileName) throws IOException {
+    String fileName = jhFileName.substring(0, jhFileName.indexOf(JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
+    JobIndexInfo indexInfo = new JobIndexInfo();
+    
+    String[] jobDetails = fileName.split(DELIMITER);
+    if (jobDetails.length != MAX_INDEX +1) {
+      throw new IOException("Failed to parse file: [" + jhFileName + "]. Expected " + (MAX_INDEX + 1) + "parts.");  
+    }
+    
+    JobID oldJobId = JobID.forName(decodeJobHistoryFileName(jobDetails[JOB_ID_INDEX]));
+    JobId jobId = TypeConverter.toYarn(oldJobId);
+    indexInfo.setJobId(jobId);
+    //TODO Catch NumberFormatException - Do not fail if there's only a few fields missing.
+    indexInfo.setSubmitTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[SUBMIT_TIME_INDEX])));
+    
+    indexInfo.setUser(decodeJobHistoryFileName(jobDetails[USER_INDEX]));
+    
+    indexInfo.setJobName(decodeJobHistoryFileName(jobDetails[JOB_NAME_INDEX]));
+    
+    indexInfo.setFinishTime(Long.parseLong(decodeJobHistoryFileName(jobDetails[FINISH_TIME_INDEX])));
+    
+    indexInfo.setNumMaps(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_MAPS_INDEX])));
+    
+    indexInfo.setNumReduces(Integer.parseInt(decodeJobHistoryFileName(jobDetails[NUM_REDUCES_INDEX])));
+    
+    return indexInfo;
+  }
+
+  
+  /**
+   * Helper function to encode the URL of the filename of the job-history 
+   * log file.
+   * 
+   * @param logFileName file name of the job-history file
+   * @return URL encoded filename
+   * @throws IOException
+   */
+  public static String encodeJobHistoryFileName(String logFileName)
+  throws IOException {
+    String replacementUnderscoreEscape = null;
+
+    if (logFileName.contains(UNDERSCORE_ESCAPE)) {
+      replacementUnderscoreEscape = nonOccursString(logFileName);
+
+      logFileName = replaceStringInstances
+        (logFileName, UNDERSCORE_ESCAPE, replacementUnderscoreEscape);
+    }
+
+    String encodedFileName = null;
+    try {
+      encodedFileName = URLEncoder.encode(logFileName, "UTF-8");
+    } catch (UnsupportedEncodingException uee) {
+      IOException ioe = new IOException();
+      ioe.initCause(uee);
+      ioe.setStackTrace(uee.getStackTrace());
+      throw ioe;
+    }
+    
+    if (replacementUnderscoreEscape != null) {
+      encodedFileName = replaceStringInstances
+        (encodedFileName, replacementUnderscoreEscape, UNDERSCORE_ESCAPE);
+    }
+
+    return encodedFileName;
+  }
+  
+  /**
+   * Helper function to decode the URL of the filename of the job-history 
+   * log file.
+   * 
+   * @param logFileName file name of the job-history file
+   * @return URL decoded filename
+   * @throws IOException
+   */
+  public static String decodeJobHistoryFileName(String logFileName)
+  throws IOException {
+    String decodedFileName = null;
+    try {
+      decodedFileName = URLDecoder.decode(logFileName, "UTF-8");
+    } catch (UnsupportedEncodingException uee) {
+      IOException ioe = new IOException();
+      ioe.initCause(uee);
+      ioe.setStackTrace(uee.getStackTrace());
+      throw ioe;
+    }
+    return decodedFileName;
+  }
+  
+  static String nonOccursString(String logFileName) {
+    int adHocIndex = 0;
+
+    String unfoundString = "q" + adHocIndex;
+
+    while (logFileName.contains(unfoundString)) {
+      unfoundString = "q" + ++adHocIndex;
+    }
+
+    return unfoundString + "q";
+  }
+  
+  private static String getUserName(JobIndexInfo indexInfo) {
+    return getNonEmptyString(indexInfo.getUser());
+  }
+  
+  private static String getJobName(JobIndexInfo indexInfo) {
+    return getNonEmptyString(indexInfo.getJobName());
+  }
+
+  //TODO Maybe handle default values for longs and integers here?
+  
+  private static String getNonEmptyString(String in) {
+    if (in == null || in.length() == 0) {
+      in = "NA";
+    }
+    return in;
+  }
+  
+  private static String escapeUnderscores(String escapee) {
+    return replaceStringInstances(escapee, "_", UNDERSCORE_ESCAPE);
+  }
+  
+  // I tolerate this code because I expect a low number of
+  // occurrences in a relatively short string
+  private static String replaceStringInstances
+      (String logFileName, String old, String replacement) {
+    int index = logFileName.indexOf(old);
+
+    while (index > 0) {
+      logFileName = (logFileName.substring(0, index)
+                     + replacement
+                     + replaceStringInstances
+                         (logFileName.substring(index + old.length()),
+                          old, replacement));
+
+      index = logFileName.indexOf(old);
+    }
+
+    return logFileName;
+  }
+  
+  /**
+   * Trims the job-name if required
+   */
+  private static String trimJobName(String jobName) {
+    if (jobName.length() > JOB_NAME_TRIM_LENGTH) {
+      jobName = jobName.substring(0, JOB_NAME_TRIM_LENGTH);
+    }
+    return jobName;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue May 10 09:44:27 2011
@@ -0,0 +1,393 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+
+public class JobHistoryUtils {
+  
+  private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
+  
+  public static final FsPermission HISTORY_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0750); // rwxr-x---
+  
+  public static final FsPermission HISTORY_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0740); // rwxr-----
+  
+  /**
+   * Suffix for configuration files.
+   */
+  public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+  
+  /**
+   * Suffix for done files.
+   */
+  public static final String DONE_FILE_NAME_SUFFIX = ".done";
+  
+  /**
+   * Job History File extension.
+   */
+  public static final String JOB_HISTORY_FILE_EXTENSION = ".jhist";
+  
+  public static final int VERSION = 4;
+
+  public static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
+  
+  public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + File.separator +  "\\d{2}" + "\\" + File.separator + "\\d{2}";
+  public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
+  private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
+  
+  /**
+   * Version substring to use while storing history files.
+   */
+  public static final String LOG_VERSION_STRING = "version-" + VERSION;
+
+  private static final PathFilter CONF_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().endsWith(CONF_FILE_NAME_SUFFIX);
+    }
+  };
+  
+  private static final PathFilter JOB_HISTORY_FILE_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().endsWith(JOB_HISTORY_FILE_EXTENSION);
+    }
+  };
+
+
+  /**
+   * Gets a PathFilter which would match configuration files.
+   * @return
+   */
+  public static PathFilter getConfFileFilter() {
+    return CONF_FILTER;
+  }
+  
+  /**
+   * Gets a PathFilter which would match job history file names.
+   * @return
+   */
+  public static PathFilter getHistoryFileFilter() {
+    return JOB_HISTORY_FILE_FILTER;
+  }
+  
+  /**
+   * Returns the current done directory.
+   * @param doneDirPrefix the prefix for the done directory.
+   * @return A string representation of the done directory.
+   */
+  public static String getCurrentDoneDir(String doneDirPrefix) {
+    return doneDirPrefix + File.separator + LOG_VERSION_STRING + File.separator;
+  }
+
+  /**
+   * Gets the configured directory prefix for In Progress history files.
+   * @param conf
+   * @return A string representation of the prefix.
+   */
+  public static String getConfiguredHistoryLogDirPrefix(Configuration conf) {
+    String defaultLogDir = conf.get(
+        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/staging";
+    String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+      defaultLogDir);
+    return logDir;
+  }
+  
+  /**
+   * Gets the configured directory prefix for intermediate done history files.
+   * @param conf
+   * @return A string representation of the prefix.
+   */
+  public static String getConfiguredHistoryIntermediateDoneDirPrefix(Configuration conf) {
+    String defaultDoneDir = conf.get(
+        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done_intermediate";
+    String  doneDirPrefix =
+      conf.get(YarnMRJobConfig.HISTORY_INTERMEDIATE_DONE_DIR_KEY,
+          defaultDoneDir);
+    return doneDirPrefix;
+  }
+  
+  /**
+   * Gets the configured directory prefix for Done history files.
+   * @param conf
+   * @return
+   */
+  public static String getConfiguredHistoryServerDoneDirPrefix(Configuration conf) {
+    String defaultDoneDir = conf.get(
+        YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done";
+    String  doneDirPrefix =
+      conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+          defaultDoneDir);
+    return doneDirPrefix; 
+  }
+
+  /**
+   * Get the job history file path for non Done history files.
+   */
+  public static Path getStagingJobHistoryFile(Path dir, JobId jobId, int attempt) {
+    return getStagingJobHistoryFile(dir, TypeConverter.fromYarn(jobId).toString(), attempt);
+  }
+  
+  /**
+   * Get the job history file path for non Done history files.
+   */
+  public static Path getStagingJobHistoryFile(Path dir, String jobId, int attempt) {
+    return new Path(dir, jobId + "_" + 
+        attempt + JOB_HISTORY_FILE_EXTENSION);
+  }
+  
+  /**
+   * Get the done configuration file name for a job.
+   * @param jobId the jobId.
+   * @return the conf file name.
+   */
+  public static String getIntermediateConfFileName(JobId jobId) {
+    return TypeConverter.fromYarn(jobId).toString() + CONF_FILE_NAME_SUFFIX;
+  }
+  
+  public static String getIntermediateDoneFileName(JobId jobId) {
+    return TypeConverter.fromYarn(jobId).toString() + DONE_FILE_NAME_SUFFIX;
+  }
+  
+  /**
+   * Gets the conf file path for jobs in progress.
+   * 
+   * @param logDir the log directory prefix.
+   * @param jobId the jobId.
+   * @param attempt attempt number for this job.
+   * @return
+   */
+  public static Path getStagingConfFile(Path logDir, JobId jobId, int attempt) {
+    Path jobFilePath = null;
+    if (logDir != null) {
+      jobFilePath = new Path(logDir, TypeConverter.fromYarn(jobId).toString()
+          + "_" + attempt + CONF_FILE_NAME_SUFFIX);
+    }
+    return jobFilePath;
+  }
+  
+  /**
+   * Gets the serial number part of the path based on the jobId and serialNumber format.
+   * @param id
+   * @param serialNumberFormat
+   * @return
+   */
+  public static String serialNumberDirectoryComponent(JobId id, String serialNumberFormat) {
+    return String.format(serialNumberFormat,
+        Integer.valueOf(jobSerialNumber(id))).substring(0,
+        SERIAL_NUMBER_DIRECTORY_DIGITS);
+  }
+  
+  /**Extracts the timstamp component from the path.
+   * @param path
+   * @return
+   */
+  public static String getTimestampPartFromPath(String path) {
+    Matcher matcher = TIMESTAMP_DIR_PATTERN.matcher(path);
+    if (matcher.find()) {
+      String matched = matcher.group();
+      matched.intern();
+      return matched;
+    } else {
+      return null;
+    }
+  }
+  
+  /**
+   * Gets the history subdirectory based on the jobId, timestamp and serial number format.
+   * @param id
+   * @param timestampComponent
+   * @param serialNumberFormat
+   * @return
+   */
+  public static String historyLogSubdirectory(JobId id, String timestampComponent, String serialNumberFormat) {
+    String result = LOG_VERSION_STRING;
+    String serialNumberDirectory = serialNumberDirectoryComponent(id, serialNumberFormat);
+    
+    result = result 
+      + File.separator + timestampComponent
+      + File.separator + serialNumberDirectory
+      + File.separator;
+    
+    return result;
+  }
+  
+  /**
+   * Gets the timestamp component based on millisecond time.
+   * @param millisecondTime
+   * @param debugMode
+   * @return
+   */
+  public static String timestampDirectoryComponent(long millisecondTime, boolean debugMode) {
+    Calendar timestamp = Calendar.getInstance();
+    timestamp.setTimeInMillis(millisecondTime);
+    String dateString = null;
+    dateString = String.format(
+        TIMESTAMP_DIR_FORMAT,
+        timestamp.get(Calendar.YEAR),
+        // months are 0-based in Calendar, but people will expect January
+        // to be month #1.
+        timestamp.get(debugMode ? Calendar.HOUR : Calendar.MONTH) + 1,
+        timestamp.get(debugMode ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
+    dateString = dateString.intern();
+    return dateString;
+  }
+  
+  public static String doneSubdirsBeforeSerialTail() {
+    // Version Info
+    String result = ("/" + LOG_VERSION_STRING);
+
+    // date
+    result = result + "/*/*/*"; // YYYY/MM/DD ;
+    return result;
+  }
+  
+  /**
+   * Computes a serial number used as part of directory naming for the given jobId.
+   * @param id the jobId.
+   * @return
+   */
+  public static int jobSerialNumber(JobId id) {
+    return id.getId();
+  }
+  
+  public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail)
+      throws IOException {
+    return localGlobber(fc, root, tail, null);
+  }
+
+  public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail,
+      PathFilter filter) throws IOException {
+    return localGlobber(fc, root, tail, filter, null);
+  }
+
+  // hasMismatches is just used to return a second value if you want
+  // one. I would have used MutableBoxedBoolean if such had been provided.
+  public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail,
+      PathFilter filter, AtomicBoolean hasFlatFiles) throws IOException {
+    if (tail.equals("")) {
+      return (listFilteredStatus(fc, root, filter));
+    }
+
+    if (tail.startsWith("/*")) {
+      Path[] subdirs = filteredStat2Paths(
+          remoteIterToList(fc.listStatus(root)), true, hasFlatFiles);
+
+      List<List<FileStatus>> subsubdirs = new LinkedList<List<FileStatus>>();
+
+      int subsubdirCount = 0;
+
+      if (subdirs.length == 0) {
+        return new LinkedList<FileStatus>();
+      }
+
+      String newTail = tail.substring(2);
+
+      for (int i = 0; i < subdirs.length; ++i) {
+        subsubdirs.add(localGlobber(fc, subdirs[i], newTail, filter, null));
+        // subsubdirs.set(i, localGlobber(fc, subdirs[i], newTail, filter,
+        // null));
+        subsubdirCount += subsubdirs.get(i).size();
+      }
+
+      List<FileStatus> result = new LinkedList<FileStatus>();
+
+      for (int i = 0; i < subsubdirs.size(); ++i) {
+        result.addAll(subsubdirs.get(i));
+      }
+
+      return result;
+    }
+
+    if (tail.startsWith("/")) {
+      int split = tail.indexOf('/', 1);
+
+      if (split < 0) {
+        return listFilteredStatus(fc, new Path(root, tail.substring(1)), filter);
+      } else {
+        String thisSegment = tail.substring(1, split);
+        String newTail = tail.substring(split);
+        return localGlobber(fc, new Path(root, thisSegment), newTail, filter,
+            hasFlatFiles);
+      }
+    }
+
+    IOException e = new IOException("localGlobber: bad tail");
+
+    throw e;
+  }
+
+  private static List<FileStatus> listFilteredStatus(FileContext fc, Path root,
+      PathFilter filter) throws IOException {
+    List<FileStatus> fsList = remoteIterToList(fc.listStatus(root));
+    if (filter == null) {
+      return fsList;
+    } else {
+      List<FileStatus> filteredList = new LinkedList<FileStatus>();
+      for (FileStatus fs : fsList) {
+        if (filter.accept(fs.getPath())) {
+          filteredList.add(fs);
+        }
+      }
+      return filteredList;
+    }
+  }
+
+  private static List<FileStatus> remoteIterToList(
+      RemoteIterator<FileStatus> rIter) throws IOException {
+    List<FileStatus> fsList = new LinkedList<FileStatus>();
+    if (rIter == null)
+      return fsList;
+    while (rIter.hasNext()) {
+      fsList.add(rIter.next());
+    }
+    return fsList;
+  }
+  
+  // hasMismatches is just used to return a second value if you want
+  // one. I would have used MutableBoxedBoolean if such had been provided.
+  private static Path[] filteredStat2Paths(List<FileStatus> stats, boolean dirs,
+      AtomicBoolean hasMismatches) {
+    int resultCount = 0;
+
+    if (hasMismatches == null) {
+      hasMismatches = new AtomicBoolean(false);
+    }
+
+    for (int i = 0; i < stats.size(); ++i) {
+      if (stats.get(i).isDirectory() == dirs) {
+        stats.set(resultCount++, stats.get(i));
+      } else {
+        hasMismatches.set(true);
+      }
+    }
+
+    Path[] result = new Path[resultCount];
+    for (int i = 0; i < resultCount; i++) {
+      result[i] = stats.get(i).getPath();
+    }
+
+    return result;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java?rev=1101385&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobIndexInfo.java Tue May 10 09:44:27 2011
@@ -0,0 +1,83 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+/**
+ * Maintains information which may be used by the jobHistroy indexing
+ * system.
+ */
+public class JobIndexInfo {
+  private long submitTime;
+  private long finishTime;
+  private String user;
+  private String jobName;
+  private JobId jobId;
+  private int numMaps;
+  private int numReduces;
+  
+  public JobIndexInfo() {
+  }
+  
+  public JobIndexInfo(long submitTime, long finishTime, String user,
+      String jobName, JobId jobId, int numMaps, int numReduces) {
+    this.submitTime = submitTime;
+    this.finishTime = finishTime;
+    this.user = user;
+    this.jobName = jobName;
+    this.jobId = jobId;
+    this.numMaps = numMaps;
+    this.numReduces = numReduces;
+  }
+  
+  public long getSubmitTime() {
+    return submitTime;
+  }
+  public void setSubmitTime(long submitTime) {
+    this.submitTime = submitTime;
+  }
+  public long getFinishTime() {
+    return finishTime;
+  }
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+  public String getUser() {
+    return user;
+  }
+  public void setUser(String user) {
+    this.user = user;
+  }
+  public String getJobName() {
+    return jobName;
+  }
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+  public JobId getJobId() {
+    return jobId;
+  }
+  public void setJobId(JobId jobId) {
+    this.jobId = jobId;
+  }
+  public int getNumMaps() {
+    return numMaps;
+  }
+  public void setNumMaps(int numMaps) {
+    this.numMaps = numMaps;
+  }
+  public int getNumReduces() {
+    return numReduces;
+  }
+  public void setNumReduces(int numReduces) {
+    this.numReduces = numReduces;
+  }
+
+  @Override
+  public String toString() {
+    return "JobIndexInfo [submitTime=" + submitTime + ", finishTime="
+        + finishTime + ", user=" + user + ", jobName=" + jobName + ", jobId="
+        + jobId + ", numMaps=" + numMaps + ", numReduces=" + numReduces + "]";
+  }
+  
+  
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ClientFactory.java Tue May 10 09:44:27 2011
@@ -45,7 +45,8 @@ public abstract class ClientFactory {
     try {
       return factory.newInstance().createClient(conf);
     } catch (Exception e) {
-      throw new IOException("Could not create ClientProtocol", e);
+      throw new IOException("Could not create ClientProtocol using factory: "
+          + factory.getName(), e);
     }
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue May 10 09:44:27 2011
@@ -45,7 +45,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.util.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.YarnException;
@@ -71,37 +71,12 @@ public class CompletedJob implements org
   private TaskAttemptCompletionEvent[] completionEvents;
   private JobInfo jobInfo;
 
-  public CompletedJob(Configuration conf, JobId jobId) throws IOException {
-    this(conf, jobId, true);
-  }
-
-  public CompletedJob(Configuration conf, JobId jobId, boolean loadTasks) throws IOException {
+  public CompletedJob(Configuration conf, JobId jobId, Path historyFile, boolean loadTasks) throws IOException {
     this.conf = conf;
     this.jobId = jobId;
-    //TODO fix
-    /*
-    String  doneLocation =
-      conf.get(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION,
-      "file:///tmp/yarn/done/status");
-    String user =
-      conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
-    String statusstoredir =
-      doneLocation + "/" + user + "/" + TypeConverter.fromYarn(jobID).toString();
-    Path statusFile = new Path(statusstoredir, "jobstats");
-    try {
-      FileContext fc = FileContext.getFileContext(statusFile.toUri(), conf);
-      FSDataInputStream in = fc.open(statusFile);
-      JobHistoryParser parser = new JobHistoryParser(in);
-      jobStats = parser.parse();
-    } catch (IOException e) {
-      LOG.info("Could not open job status store file from dfs " +
-        TypeConverter.fromYarn(jobID).toString());
-      throw new IOException(e);
-    }
-    */
     
     //TODO: load the data lazily. for now load the full data upfront
-    loadFullHistoryData(loadTasks);
+    loadFullHistoryData(loadTasks, historyFile);
 
     counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
     diagnostics.add(jobInfo.getErrorInfo());
@@ -159,7 +134,7 @@ public class CompletedJob implements org
   }
 
   //History data is leisurely loaded when task level data is requested
-  private synchronized void loadFullHistoryData(boolean loadTasks) {
+  private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute) {
     if (jobInfo != null) {
       return; //data already loaded
     }
@@ -169,11 +144,21 @@ public class CompletedJob implements org
     }
     String jobName = TypeConverter.fromYarn(jobId).toString();
     
-    String  jobhistoryDir = JobHistoryUtils.getConfiguredHistoryDoneDirPrefix(conf);
+    String  jobhistoryDir = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
       
     
     String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
     
+    if (historyFileAbsolute != null) {
+      try {
+      JobHistoryParser parser = new JobHistoryParser(historyFileAbsolute.getFileSystem(conf), historyFileAbsolute);
+      jobInfo = parser.parse();
+      } catch (IOException e) {
+        throw new YarnException("Could not load history file " + historyFileAbsolute,
+            e);
+      }
+    }
+    else {
     FSDataInputStream in = null;
     Path historyFile = null;
     try {
@@ -192,6 +177,7 @@ public class CompletedJob implements org
       throw new YarnException("Could not load history file " + historyFile,
           e);
     }
+    }
     
     if (loadTasks) {
     // populate the tasks

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java?rev=1101385&r1=1101384&r2=1101385&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java Tue May 10 09:44:27 2011
@@ -18,33 +18,24 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.mapreduce.v2.hs.JobHistory.HistoryCleaner;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 public class HistoryCleanerService extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(HistoryClientService.class);
   
-  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L;
-  private FileContext doneDirFc;
-  private HistoryCleaner historyCleanerThread = null;
+  static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
+  static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
+  
+  private ScheduledThreadPoolExecutor scheduledExecutor = null;
 
   private Configuration conf;
 
@@ -54,89 +45,32 @@ public class HistoryCleanerService exten
   }
 
   public void start() {
-    long maxAgeOfHistoryFiles = conf.getLong(
-        YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
-    historyCleanerThread  = new HistoryCleaner(maxAgeOfHistoryFiles);
-    historyCleanerThread.start();
-    super.start();
+//    long maxAgeOfHistoryFiles = conf.getLong(
+//        YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
+//    scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+//    long runInterval = conf.getLong(YarnMRJobConfig.HISTORY_CLEANER_RUN_INTERVAL, DEFAULT_RUN_INTERVAL);
+//    HistoryCleaner c;
+//    scheduledExecutor.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), 30*1000l, runInterval, TimeUnit.MILLISECONDS);
+//    super.start();
   }
 
   /** Shut down JobHistory after stopping the History cleaner */
   @Override
   public void stop() {
-    LOG.info("Interrupting History Cleaner");
-    historyCleanerThread.interrupt();
-    try {
-      historyCleanerThread.join();
-    } catch (InterruptedException e) {
-      LOG.info("Error with shutting down history cleaner thread");
-    }
-  }
-  /**
-   * Delete history files older than a specified time duration.
-   */
-  class HistoryCleaner extends Thread {
-    static final long ONE_DAY_IN_MS = 7 * 24 * 60 * 60 * 1000L;
-    private long cleanupFrequency;
-    private long maxAgeOfHistoryFiles;
-
-    public HistoryCleaner(long maxAge) {
-      setName("Thread for cleaning up History files");
-      setDaemon(true);
-      this.maxAgeOfHistoryFiles = maxAge;
-      cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles);
-      LOG.info("Job History Cleaner Thread started." +
-          " MaxAge is " + 
-          maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," +
-          " Cleanup Frequency is " +
-          + cleanupFrequency + " ms (" +
-          ((float)cleanupFrequency)/ONE_DAY_IN_MS + " days)");
-    }
-
-    @Override
-    public void run(){
-  
-      while (true) {
-        try {
-          doCleanup(); 
-          Thread.sleep(cleanupFrequency);
-        }
-        catch (InterruptedException e) {
-          LOG.info("History Cleaner thread exiting");
-          return;
-        }
-        catch (Throwable t) {
-          LOG.warn("History cleaner thread threw an exception", t);
-        }
-      }
+//    LOG.info("Interrupting History Cleaner");
+//    scheduledExecutor.shutdown();
+//    boolean interrupted = false;
+//    long currentTime = System.currentTimeMillis();
+//    while (!scheduledExecutor.isShutdown() && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
+//      try {
+//        Thread.sleep(20);
+//      } catch (InterruptedException e) {
+//        interrupted = true;
+//      }
+//    }
+//    if (!scheduledExecutor.isShutdown()) {
+//      LOG.warn("HistoryCleanerService shutdown may not have succeeded");
+//    }
     }
 
-    private void doCleanup() {
-      long now = System.currentTimeMillis();
-      try {
-        String defaultDoneDir = conf.get(
-            YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
-        String  jobhistoryDir =
-          conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir);
-        Path done = FileContext.getFileContext(conf).makeQualified(
-            new Path(jobhistoryDir));
-        doneDirFc = FileContext.getFileContext(done.toUri(), conf);
-        RemoteIterator<LocatedFileStatus> historyFiles =
-          doneDirFc.util().listFiles(done, true);
-        if (historyFiles != null) {
-          FileStatus f;
-          while (historyFiles.hasNext()) {
-            f = historyFiles.next();
-            if (now - f.getModificationTime() > maxAgeOfHistoryFiles) {
-              doneDirFc.delete(f.getPath(), true); 
-              LOG.info("Deleting old history file : " + f.getPath());
-            }
-          }
-        }
-      } catch (IOException ie) {
-        LOG.info("Error cleaning up history directory" + 
-            StringUtils.stringifyException(ie));
-      }
-    }
-  }
 }