You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/08/10 06:22:13 UTC

svn commit: r802645 - in /hadoop/mapreduce/trunk: ./ src/java/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: sharad
Date: Mon Aug 10 04:22:12 2009
New Revision: 802645

URL: http://svn.apache.org/viewvc?rev=802645&view=rev
Log:
MAPREDUCE-814. Provide a way to configure completed job history files to be on HDFS.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 10 04:22:12 2009
@@ -50,6 +50,9 @@
     MAPREDUCE-705. User-configurable quote and delimiter characters for Sqoop
     records and record reparsing. (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-814. Provide a way to configure completed job history files 
+    to be on HDFS. (sharad)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Mon Aug 10 04:22:12 2009
@@ -27,6 +27,15 @@
 </property>
 
 <property>
+  <name>mapred.job.tracker.history.completed.location</name>
+  <value></value>
+  <description> The completed job history files are stored at this single well 
+  known location. If nothing is specified, the files are stored at 
+  ${hadoop.job.history.location}/done.
+  </description>
+</property>
+
+<property>
   <name>mapred.committer.job.setup.cleanup.needed</name>
   <value>true</value>
   <description> true, if job needs job-setup and job-cleanup.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Mon Aug 10 04:22:12 2009
@@ -30,14 +30,19 @@
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -102,7 +107,8 @@
     FsPermission.createImmutable((short) 0750); // rwxr-x---
   final static FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0740); // rwxr-----
-  private static FileSystem JT_FS; // jobtracker's filesystem
+  private static FileSystem LOGDIR_FS; // log dir filesystem
+  private static FileSystem DONEDIR_FS; // Done dir filesystem
   private static Path DONE = null; // folder for completed jobs
   
   /**
@@ -118,11 +124,23 @@
       Path historyFilename; // path of job history file
       Path confFilename; // path of job's conf
     }
-    
+
+    private ThreadPoolExecutor executor = null;
+    private final Configuration conf;
+
     // cache from job-key to files associated with it.
     private Map<JobID, FilesHolder> fileCache = 
       new ConcurrentHashMap<JobID, FilesHolder>();
 
+    JobHistoryFilesManager(Configuration conf) throws IOException {
+      this.conf = conf;
+    }
+
+    void start() {
+      executor = new ThreadPoolExecutor(1, 3, 1, 
+          TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+    }
+
     private FilesHolder getFileHolder(JobID id) {
       FilesHolder holder = fileCache.get(id);
       if (holder == null) {
@@ -165,6 +183,33 @@
     void purgeJob(JobID id) {
       fileCache.remove(id);
     }
+
+    void moveToDone(final JobID id, final List<Path> paths) {
+      executor.execute(new Runnable() {
+
+        public void run() {
+          //move the files to DONE folder
+          try {
+            for (Path path : paths) {
+              //check if path exists, in case of retries it may not exist
+              if (LOGDIR_FS.exists(path)) {
+                LOG.info("Moving " + path.toString() + " to " + 
+                    DONE.toString()); 
+                DONEDIR_FS.moveFromLocalFile(path, DONE);
+                DONEDIR_FS.setPermission(new Path(DONE, path.getName()), 
+                    new FsPermission(HISTORY_FILE_PERMISSION));
+              }
+            }
+
+            //purge the job from the cache
+            fileManager.purgeJob(id);
+          } catch (Throwable e) {
+            LOG.error("Unable to move history file to DONE folder.", e);
+          }
+        }
+
+      });
+    }
   }
   /**
    * Record types are identifiers for each line of log in history files. 
@@ -229,18 +274,18 @@
         "file:///" + new File(
         System.getProperty("hadoop.log.dir")).getAbsolutePath()
         + File.separator + "history");
-      DONE = new Path(LOG_DIR, "done");
       JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
                                     String.valueOf(jobTrackerStartTime) + "_";
       jobtrackerHostname = hostname;
       Path logDir = new Path(LOG_DIR);
       if (fs == null) {
-        JT_FS = logDir.getFileSystem(conf);
+        LOGDIR_FS = logDir.getFileSystem(conf);
       } else {
-        JT_FS = fs;
+        LOGDIR_FS = fs;
       }
-      if (!JT_FS.exists(logDir)){
-        if (!JT_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
+      if (!LOGDIR_FS.exists(logDir)){
+        if (!LOGDIR_FS.mkdirs(logDir, 
+            new FsPermission(HISTORY_DIR_PERMISSION))) {
           throw new IOException("Mkdirs failed to create " + logDir.toString());
         }
       }
@@ -250,12 +295,40 @@
       jobHistoryBlockSize = 
         conf.getLong("mapred.jobtracker.job.history.block.size", 
                      3 * 1024 * 1024);
-      
-      // create the done folder with appropriate permission
-      JT_FS.mkdirs(DONE, HISTORY_DIR_PERMISSION);
 
       // initialize the file manager
-      fileManager = new JobHistoryFilesManager();
+      fileManager = new JobHistoryFilesManager(conf);
+    } catch(IOException e) {
+        LOG.error("Failed to initialize JobHistory log file", e); 
+        disableHistory = true;
+    }
+    return !(disableHistory);
+  }
+
+  static boolean initDone(JobConf conf, FileSystem fs){
+    try {
+      //if completed job history location is set, use that
+      String doneLocation = conf.
+                       get("mapred.job.tracker.history.completed.location");
+      if (doneLocation != null) {
+        DONE = fs.makeQualified(new Path(doneLocation));
+        DONEDIR_FS = fs;
+      } else {
+        DONE = new Path(LOG_DIR, "done");
+        DONEDIR_FS = LOGDIR_FS;
+      }
+
+      //If not already present create the done folder with appropriate 
+      //permission
+      if (!DONEDIR_FS.exists(DONE)) {
+        LOG.info("Creating DONE folder at "+ DONE);
+        if (! DONEDIR_FS.mkdirs(DONE, 
+            new FsPermission(HISTORY_DIR_PERMISSION))) {
+          throw new IOException("Mkdirs failed to create " + DONE.toString());
+        }
+      }
+
+      fileManager.start();
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -739,15 +812,22 @@
       if (LOG_DIR == null) {
         return null;
       }
-      return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
+      return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
+    }
+
+    static synchronized String getDoneJobHistoryFileName(JobConf jobConf, 
+        JobID id) throws IOException {
+      if (DONE == null) {
+        return null;
+      }
+      return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS);
     }
 
     /**
      * @param dir The directory where to search.
      */
-    static synchronized String getJobHistoryFileName(JobConf jobConf, 
-                                                            JobID id, 
-                                                            Path dir) 
+    private static synchronized String getJobHistoryFileName(JobConf jobConf, 
+                                          JobID id, Path dir, FileSystem fs) 
     throws IOException {
       String user = getUserName(jobConf);
       String jobName = trimJobName(getJobName(jobConf));
@@ -776,7 +856,7 @@
         }
       };
       
-      FileStatus[] statuses = JT_FS.listStatus(dir, filter);
+      FileStatus[] statuses = fs.listStatus(dir, filter);
       String filename = null;
       if (statuses.length == 0) {
         LOG.info("Nothing to recover for job " + id);
@@ -816,7 +896,7 @@
       Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
       if (logPath != null) {
         LOG.info("Deleting job history file " + logPath.getName());
-        JT_FS.delete(logPath, false);
+        LOGDIR_FS.delete(logPath, false);
       }
       // do the same for the user file too
       logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, 
@@ -848,20 +928,20 @@
       String tmpFilename = getSecondaryJobHistoryFile(logFileName);
       Path logDir = logFilePath.getParent();
       Path tmpFilePath = new Path(logDir, tmpFilename);
-      if (JT_FS.exists(logFilePath)) {
+      if (LOGDIR_FS.exists(logFilePath)) {
         LOG.info(logFileName + " exists!");
-        if (JT_FS.exists(tmpFilePath)) {
+        if (LOGDIR_FS.exists(tmpFilePath)) {
           LOG.info("Deleting " + tmpFilename 
                    + "  and using " + logFileName + " for recovery.");
-          JT_FS.delete(tmpFilePath, false);
+          LOGDIR_FS.delete(tmpFilePath, false);
         }
         ret = tmpFilePath;
       } else {
         LOG.info(logFileName + " doesnt exist! Using " 
                  + tmpFilename + " for recovery.");
-        if (JT_FS.exists(tmpFilePath)) {
+        if (LOGDIR_FS.exists(tmpFilePath)) {
           LOG.info("Renaming " + tmpFilename + " to " + logFileName);
-          JT_FS.rename(tmpFilePath, logFilePath);
+          LOGDIR_FS.rename(tmpFilePath, logFilePath);
           ret = tmpFilePath;
         } else {
           ret = logFilePath;
@@ -919,7 +999,7 @@
       // rename the tmp file to the master file. Note that this should be 
       // done only when the file is closed and handles are released.
       LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
-      JT_FS.rename(tmpLogPath, masterLogPath);
+      LOGDIR_FS.rename(tmpLogPath, masterLogPath);
       // update the cache
       fileManager.setHistoryFile(id, masterLogPath);
       
@@ -960,28 +1040,25 @@
      * This *should* be the last call to jobhistory for a given job.
      */
      static void markCompleted(JobID id) throws IOException {
+       List<Path> paths = new ArrayList<Path>();
        Path path = fileManager.getHistoryFile(id);
        if (path == null) {
          LOG.info("No file for job-history with " + id + " found in cache!");
-         return;
+       } else {
+         paths.add(path);
        }
-       Path newPath = new Path(DONE, path.getName());
-       LOG.info("Moving completed job from " + path + " to " + newPath);
-       JT_FS.rename(path, newPath);
 
        Path confPath = fileManager.getConfFileWriters(id);
        if (confPath == null) {
          LOG.info("No file for jobconf with " + id + " found in cache!");
-         return;
+       } else {
+         paths.add(confPath);
        }
-       // move the conf too
-       newPath = new Path(DONE, confPath.getName());
-       LOG.info("Moving configuration of completed job from " + confPath 
-                + " to " + newPath);
-       JT_FS.rename(confPath, newPath);
 
-       // purge the job from the cache
-       fileManager.purgeJob(id);
+       //move the job files to done folder and purge the job
+       if (paths.size() > 0) {
+         fileManager.moveToDone(id, paths);
+       }
      }
 
      /**
@@ -1056,12 +1133,12 @@
             }
             
             int defaultBufferSize = 
-              JT_FS.getConf().getInt("io.file.buffer.size", 4096);
-            out = JT_FS.create(logFile, 
+              LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+            out = LOGDIR_FS.create(logFile, 
                             new FsPermission(HISTORY_FILE_PERMISSION),
                             EnumSet.of(CreateFlag.OVERWRITE), 
                             defaultBufferSize, 
-                            JT_FS.getDefaultReplication(), 
+                            LOGDIR_FS.getDefaultReplication(), 
                             jobHistoryBlockSize, null);
             writer = new PrintWriter(out);
             fileManager.addWriter(jobId, writer);
@@ -1140,14 +1217,14 @@
       try {
         if (LOG_DIR != null) {
           int defaultBufferSize = 
-              JT_FS.getConf().getInt("io.file.buffer.size", 4096);
-          if (!JT_FS.exists(jobFilePath)) {
-            jobFileOut = JT_FS.create(jobFilePath, 
+              LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+          if (!LOGDIR_FS.exists(jobFilePath)) {
+            jobFileOut = LOGDIR_FS.create(jobFilePath, 
                                    new FsPermission(HISTORY_FILE_PERMISSION),
                                    EnumSet.of(CreateFlag.OVERWRITE), 
                                    defaultBufferSize, 
-                                   JT_FS.getDefaultReplication(), 
-                                   JT_FS.getDefaultBlockSize(), null);
+                                   LOGDIR_FS.getDefaultReplication(), 
+                                   LOGDIR_FS.getDefaultBlockSize(), null);
             jobConf.writeXml(jobFileOut);
             jobFileOut.close();
           }
@@ -1963,12 +2040,12 @@
       lastRan = now;  
       isRunning = true; 
       try {
-        FileStatus[] historyFiles = JT_FS.listStatus(DONE);
+        FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE);
         // delete if older than 30 days
         if (historyFiles != null) {
           for (FileStatus f : historyFiles) {
             if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
-              JT_FS.delete(f.getPath(), true); 
+              DONEDIR_FS.delete(f.getPath(), true); 
               LOG.info("Deleting old history file : " + f.getPath());
             }
           }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Aug 10 04:22:12 2009
@@ -1873,14 +1873,7 @@
     // initialize history parameters.
     boolean historyInitialized = JobHistory.init(conf, this.localMachine,
                                                  this.startTime);
-    String historyLogDir = null;
-    FileSystem historyFS = null;
-    if (historyInitialized) {
-      historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString();
-      infoServer.setAttribute("historyLogDir", historyLogDir);
-      historyFS = new Path(historyLogDir).getFileSystem(conf);
-      infoServer.setAttribute("fileSys", historyFS);
-    }
+    
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
     
@@ -1969,13 +1962,13 @@
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
 
-    // Initialize history again if it is not initialized
-    // because history was on dfs and namenode was in safemode.
-    if (!historyInitialized) {
-      JobHistory.init(conf, this.localMachine, this.startTime); 
-      historyLogDir = conf.get("hadoop.job.history.location");
+    // Initialize history DONE folder
+    if (historyInitialized) {
+      JobHistory.initDone(conf, fs);
+      String historyLogDir = 
+        JobHistory.getCompletedJobHistoryLocation().toString();
       infoServer.setAttribute("historyLogDir", historyLogDir);
-      historyFS = new Path(historyLogDir).getFileSystem(conf);
+      FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
       infoServer.setAttribute("fileSys", historyFS);
     }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Mon Aug 10 04:22:12 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobHistory.*;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.commons.logging.Log;
@@ -474,8 +475,7 @@
 
     // Get the history file name
     Path dir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = 
-      JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir);
+    String logFileName = getDoneFile(conf, id, dir);
 
     // Framework history log file location
     Path logFile = new Path(dir, logFileName);
@@ -772,8 +772,7 @@
     JobID id = job.getID();
     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  doneDir);
+    String logFileName = getDoneFile(conf, id, doneDir);
 
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
@@ -801,6 +800,92 @@
     validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
   }
 
+  public void testDoneFolderOnHDFS() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      // keep for less time
+      conf.setLong("mapred.jobtracker.retirejob.check", 1000);
+      conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+      //set the done folder location
+      String doneFolder = "history_done";
+      conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+
+      MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+      mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
+          3, null, null, conf);
+
+      // run the TCs
+      conf = mr.createJobConf();
+
+      FileSystem fs = FileSystem.get(conf);
+      // clean up
+      fs.delete(new Path("succeed"), true);
+
+      Path inDir = new Path("succeed/input");
+      Path outDir = new Path("succeed/output");
+
+      //Disable speculative execution
+      conf.setSpeculativeExecution(false);
+
+      // Make sure that the job is not removed from memory until we do finish
+      // the validation of history file content
+      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
+
+      // Run a job that will be succeeded and validate its history file
+      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+      
+      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      assertEquals("History DONE folder not correct", 
+          doneFolder, doneDir.getName());
+      JobID id = job.getID();
+      String logFileName = getDoneFile(conf, id, doneDir);
+
+      // Framework history log file location
+      Path logFile = new Path(doneDir, logFileName);
+      FileSystem fileSys = logFile.getFileSystem(conf);
+   
+      // Check if the history file exists
+      assertTrue("History file does not exist", fileSys.exists(logFile));
+
+      // check if the corresponding conf file exists
+      Path confFile = getPathForConf(logFile, doneDir);
+      assertTrue("Config for completed jobs doesnt exist", 
+                 fileSys.exists(confFile));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed job config doesnt exist in the done folder", 
+                 doneDir.getName().equals(confFile.getParent().getName()));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed jobs doesnt exist in the done folder", 
+                 doneDir.getName().equals(logFile.getParent().getName()));
+      
+
+      // check if the job file is removed from the history location 
+      Path runningJobsHistoryFolder = logFile.getParent().getParent();
+      Path runningJobHistoryFilename = 
+        new Path(runningJobsHistoryFolder, logFile.getName());
+      Path runningJobConfFilename = 
+        new Path(runningJobsHistoryFolder, confFile.getName());
+      assertFalse("History file not deleted from the running folder", 
+                  fileSys.exists(runningJobHistoryFilename));
+      assertFalse("Config for completed jobs not deleted from running folder", 
+                  fileSys.exists(runningJobConfFilename));
+
+      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+      validateJobHistoryFileContent(mr, job, conf);
+
+      // get the job conf filename
+    } finally {
+      if (mr != null) {
+        cleanupLocalFiles(mr);
+        mr.shutdown();
+      }
+    }
+  }
+
   /** Run a job that will be succeeded and validate its history file format
    *  and its content.
    */
@@ -811,6 +896,11 @@
       // keep for less time
       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
       conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+      //set the done folder location
+      String doneFolder = TEST_ROOT_DIR + "history_done";
+      conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+      
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 
       // run the TCs
@@ -834,9 +924,11 @@
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
       Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      assertEquals("History DONE folder not correct", 
+          doneFolder, doneDir.toString());
       JobID id = job.getID();
-      String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                    doneDir);
+      String logFileName = getDoneFile(conf, id, doneDir);
+
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
       FileSystem fileSys = logFile.getFileSystem(conf);
@@ -851,11 +943,11 @@
 
       // check if the file exists in a done folder
       assertTrue("Completed job config doesnt exist in the done folder", 
-                 "done".equals(confFile.getParent().getName()));
+                 doneDir.getName().equals(confFile.getParent().getName()));
 
       // check if the file exists in a done folder
       assertTrue("Completed jobs doesnt exist in the done folder", 
-                 "done".equals(logFile.getParent().getName()));
+                 doneDir.getName().equals(logFile.getParent().getName()));
       
 
       // check if the job file is removed from the history location 
@@ -889,6 +981,17 @@
     }
   }
 
+  //Returns the file in the done folder
+  //Waits for sometime to get the file moved to done
+  private static String getDoneFile(JobConf conf, JobID id, 
+      Path doneDir) throws IOException {
+    String name = null;
+    for (int i = 0; name == null && i < 20; i++) {
+      name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
+      UtilsForTests.waitFor(1000);
+    }
+    return name;
+  }
   // Returns the output path where user history log file is written to with
   // default configuration setting for hadoop.job.history.user.location
   private static Path getLogLocationInOutputPath(String logFileName,
@@ -909,8 +1012,7 @@
           throws IOException  {
     // Get the history file name
     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  doneDir);
+    String logFileName = getDoneFile(conf, id, doneDir);
 
     // User history log file location
     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
@@ -1021,8 +1123,7 @@
 
     // Get the history file name
     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  doneDir);
+    String logFileName = getDoneFile(conf, id, doneDir);
 
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java Mon Aug 10 04:22:12 2009
@@ -308,48 +308,7 @@
     assertTrue("Cluster status is insane", 
                checkClusterStatusOnCompletion(status, prevStatus));
   }
-  
-  /**
-   * Checks if the history files are as expected
-   * @param id job id
-   * @param conf job conf
-   */
-  private void testJobHistoryFiles(JobID id, JobConf conf) 
-  throws IOException  {
-    // Get the history files for users
-    Path dir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  dir);
-    String tempLogFileName = 
-      JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
-    
-    // I. User files
-    Path logFile = 
-      JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf);
-    FileSystem fileSys = logFile.getFileSystem(conf);
-    
-    // Check if the history file exists
-    assertTrue("User log file does not exist", fileSys.exists(logFile));
-    
-    // Check if the temporary file is deleted
-    Path tempLogFile = 
-      JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, 
-                                                         conf);
-    assertFalse("User temporary log file exists", fileSys.exists(tempLogFile));
-    
-    // II. Framework files
-    // Get the history file
-    logFile = new Path(dir, logFileName);
-    fileSys = logFile.getFileSystem(conf);
-    
-    // Check if the history file exists
-    assertTrue("Log file does not exist", fileSys.exists(logFile));
-    
-    // Check if the temporary file is deleted
-    tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName);
-    assertFalse("Temporary log file exists", fileSys.exists(tempLogFile));
-  }
-  
+
   /**
    * Matches specified number of task reports.
    * @param source the reports to be matched