You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:26:50 UTC

svn commit: r1076969 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 03:26:49 2011
New Revision: 1076969

URL: http://svn.apache.org/viewvc?rev=1076969&view=rev
Log:
commit 74ce0696bde3778d2579e0bbeeed1012a9a4bf1b
Author: Yahoo\! <lt...@yahoo-inc.com>
Date:   Tue Aug 11 10:29:11 2009 -0700

    Applying patch 2923008.mr814.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 03:26:49 2011
@@ -26,6 +26,15 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.job.tracker.history.completed.location</name>
+  <value></value>
+  <description> The completed job history files are stored at this single well 
+  known location. If nothing is specified, the files are stored at 
+  ${hadoop.job.history.location}/done.
+  </description>
+</property>
+
 <!-- i/o properties -->
 
 <property>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar  4 03:26:49 2011
@@ -30,14 +30,19 @@ import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -101,6 +106,8 @@ public class JobHistory {
     FsPermission.createImmutable((short) 0750); // rwxr-x---
   final static FsPermission HISTORY_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0740); // rwxr-----
+  private static FileSystem LOGDIR_FS; // log dir filesystem
+  private static FileSystem DONEDIR_FS; // Done dir filesystem
   private static JobConf jtConf;
   private static Path DONE = null; // folder for completed jobs
   /**
@@ -125,11 +132,23 @@ public class JobHistory {
       Path historyFilename; // path of job history file
       Path confFilename; // path of job's conf
     }
-    
+   
+    private ThreadPoolExecutor executor = null;
+    private final Configuration conf;
+
    // cache from job-key to files associated with it.
     private Map<JobID, FilesHolder> fileCache = 
       new ConcurrentHashMap<JobID, FilesHolder>();
 
+    JobHistoryFilesManager(Configuration conf) throws IOException {
+      this.conf = conf;
+    }
+
+    void start() {
+      executor = new ThreadPoolExecutor(1, 3, 1, 
+          TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+    }
+
     private FilesHolder getFileHolder(JobID id) {
       FilesHolder holder = fileCache.get(id);
      if (holder == null) {
@@ -172,6 +191,33 @@ public class JobHistory {
     void purgeJob(JobID id) {
       fileCache.remove(id);
     }
+
+    void moveToDone(final JobID id, final List<Path> paths) {
+      executor.execute(new Runnable() {
+
+        public void run() {
+          //move the files to DONE folder
+          try {
+            for (Path path : paths) {
+              //check if path exists, in case of retries it may not exist
+              if (LOGDIR_FS.exists(path)) {
+                LOG.info("Moving " + path.toString() + " to " + 
+                    DONE.toString()); 
+                DONEDIR_FS.moveFromLocalFile(path, DONE);
+                DONEDIR_FS.setPermission(new Path(DONE, path.getName()), 
+                    new FsPermission(HISTORY_FILE_PERMISSION));
+              }
+            }
+
+            //purge the job from the cache
+            fileManager.purgeJob(id);
+          } catch (Throwable e) {
+            LOG.error("Unable to move history file to DONE folder.", e);
+          }
+        }
+
+      });
+    }
   }
   /**
    * Record types are identifiers for each line of log in history files. 
@@ -222,14 +268,13 @@ public class JobHistory {
         "file:///" + new File(
         System.getProperty("hadoop.log.dir")).getAbsolutePath()
         + File.separator + "history");
-      DONE = new Path(LOG_DIR, "done");
       JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
                                     String.valueOf(jobTrackerStartTime) + "_";
       jobtrackerHostname = hostname;
       Path logDir = new Path(LOG_DIR);
-      FileSystem fs = logDir.getFileSystem(conf);
-      if (!fs.exists(logDir)){
-        if (!fs.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
+      LOGDIR_FS = logDir.getFileSystem(conf);
+      if (!LOGDIR_FS.exists(logDir)){
+        if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
           throw new IOException("Mkdirs failed to create " + logDir.toString());
         }
       }
@@ -241,11 +286,8 @@ public class JobHistory {
                      3 * 1024 * 1024);
       jtConf = conf;
 
-      // create the done folder with appropriate permission
-      fs.mkdirs(DONE, HISTORY_DIR_PERMISSION);
-
       // initialize the file manager
-      fileManager = new JobHistoryFilesManager();
+      fileManager = new JobHistoryFilesManager(conf);
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -253,6 +295,38 @@ public class JobHistory {
     return !(disableHistory);
   }
 
+  static boolean initDone(JobConf conf, FileSystem fs){
+    try {
+      //if completed job history location is set, use that
+      String doneLocation = conf.
+                       get("mapred.job.tracker.history.completed.location");
+      if (doneLocation != null) {
+        DONE = fs.makeQualified(new Path(doneLocation));
+        DONEDIR_FS = fs;
+      } else {
+        DONE = new Path(LOG_DIR, "done");
+        DONEDIR_FS = LOGDIR_FS;
+      }
+
+      //If not already present create the done folder with appropriate 
+      //permission
+      if (!DONEDIR_FS.exists(DONE)) {
+        LOG.info("Creating DONE folder at "+ DONE);
+        if (! DONEDIR_FS.mkdirs(DONE, 
+            new FsPermission(HISTORY_DIR_PERMISSION))) {
+          throw new IOException("Mkdirs failed to create " + DONE.toString());
+        }
+      }
+
+      fileManager.start();
+    } catch(IOException e) {
+        LOG.error("Failed to initialize JobHistory log file", e); 
+        disableHistory = true;
+    }
+    return !(disableHistory);
+  }
+
+
   /**
    * Manages job-history's meta information such as version etc.
    * Helps in logging version information to the job-history and recover
@@ -724,20 +798,26 @@ public class JobHistory {
     public static synchronized String getJobHistoryFileName(JobConf jobConf, 
                                                             JobID id) 
     throws IOException {                    
-      return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
+      return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
+    }
+
+    static synchronized String getDoneJobHistoryFileName(JobConf jobConf, 
+        JobID id) throws IOException {
+      if (DONE == null) {
+        return null;
+      }
+      return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS);
     }
 
     /**
      * @param dir The directory where to search.
      */
-    static synchronized String getJobHistoryFileName(JobConf jobConf, 
-                                                            JobID id, 
-                                                            Path dir) 
+    private static synchronized String getJobHistoryFileName(JobConf jobConf, 
+                                          JobID id, Path dir, FileSystem fs) 
     throws IOException {
       String user = getUserName(jobConf);
       String jobName = trimJobName(getJobName(jobConf));
       
-      FileSystem fs = new Path(LOG_DIR).getFileSystem(jobConf);
       if (LOG_DIR == null) {
         return null;
       }
@@ -805,9 +885,8 @@ public class JobHistory {
     throws IOException {
       Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
       if (logPath != null) {
-        FileSystem fs = logPath.getFileSystem(conf);
         LOG.info("Deleting job history file " + logPath.getName());
-        fs.delete(logPath, false);
+        LOGDIR_FS.delete(logPath, false);
       }
       // do the same for the user file too
       logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, 
@@ -835,25 +914,24 @@ public class JobHistory {
                                                           Path logFilePath) 
     throws IOException {
       Path ret;
-      FileSystem fs = logFilePath.getFileSystem(conf);
       String logFileName = logFilePath.getName();
       String tmpFilename = getSecondaryJobHistoryFile(logFileName);
       Path logDir = logFilePath.getParent();
       Path tmpFilePath = new Path(logDir, tmpFilename);
-      if (fs.exists(logFilePath)) {
+      if (LOGDIR_FS.exists(logFilePath)) {
         LOG.info(logFileName + " exists!");
-        if (fs.exists(tmpFilePath)) {
+        if (LOGDIR_FS.exists(tmpFilePath)) {
           LOG.info("Deleting " + tmpFilename 
                    + "  and using " + logFileName + " for recovery.");
-          fs.delete(tmpFilePath, false);
+          LOGDIR_FS.delete(tmpFilePath, false);
         }
         ret = tmpFilePath;
       } else {
         LOG.info(logFileName + " doesnt exist! Using " 
                  + tmpFilename + " for recovery.");
-        if (fs.exists(tmpFilePath)) {
+        if (LOGDIR_FS.exists(tmpFilePath)) {
           LOG.info("Renaming " + tmpFilename + " to " + logFileName);
-          fs.rename(tmpFilePath, logFilePath);
+          LOGDIR_FS.rename(tmpFilePath, logFilePath);
           ret = tmpFilePath;
         } else {
           ret = logFilePath;
@@ -863,7 +941,7 @@ public class JobHistory {
       // do the same for the user files too
       logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
       if (logFilePath != null) {
-        fs = logFilePath.getFileSystem(conf);
+        FileSystem fs = logFilePath.getFileSystem(conf);
         logDir = logFilePath.getParent();
         tmpFilePath = new Path(logDir, tmpFilename);
         if (fs.exists(logFilePath)) {
@@ -911,8 +989,7 @@ public class JobHistory {
        // rename the tmp file to the master file. Note that this should be 
        // done only when the file is closed and handles are released.
        LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
-       FileSystem fs = tmpLogPath.getFileSystem(jtConf);
-       fs.rename(tmpLogPath, masterLogPath);
+       LOGDIR_FS.rename(tmpLogPath, masterLogPath);
        // update the cache
        fileManager.setHistoryFile(id, masterLogPath);
 
@@ -924,7 +1001,7 @@ public class JobHistory {
         JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName, 
                                                            conf);
       if (masterLogPath != null) {
-        fs = masterLogPath.getFileSystem(conf);
+        FileSystem fs = masterLogPath.getFileSystem(conf);
         if (fs.exists(tmpLogPath)) {
           LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
                    + " in user directory");
@@ -966,29 +1043,27 @@ public class JobHistory {
      * This *should* be the last call to jobhistory for a given job.
      */
      static void markCompleted(JobID id) throws IOException {
+       List<Path> paths = new ArrayList<Path>();
        Path path = fileManager.getHistoryFile(id);
        if (path == null) {
          LOG.info("No file for job-history with " + id + " found in cache!");
          return;
+       } else {
+         paths.add(path);
        }
-       Path newPath = new Path(DONE, path.getName());
-       LOG.info("Moving completed job from " + path + " to " + newPath);
-       FileSystem fs = path.getFileSystem(jtConf);
-       fs.rename(path, newPath);
 
        Path confPath = fileManager.getConfFileWriters(id);
        if (confPath == null) {
          LOG.info("No file for jobconf with " + id + " found in cache!");
          return;
+       } else {
+         paths.add(confPath);
        }
-       // move the conf too
-       newPath = new Path(DONE, confPath.getName());
-       LOG.info("Moving configuration of completed job from " + confPath 
-                + " to " + newPath);
-       fs.rename(confPath, newPath);
 
-       // purge the job from the cache
-       fileManager.purgeJob(id);
+       //move the job files to done folder and purge the job
+       if (paths.size() > 0) {
+         fileManager.moveToDone(id, paths);
+       }
      }
 
      /**
@@ -1056,20 +1131,18 @@ public class JobHistory {
 
           if (LOG_DIR != null) {
             // create output stream for logging in hadoop.job.history.location
-            fs = new Path(LOG_DIR).getFileSystem(jobConf);
-            
             if (restarted) {
               logFile = recoverJobHistoryFile(jobConf, logFile);
               logFileName = logFile.getName();
             }
             
             int defaultBufferSize = 
-              fs.getConf().getInt("io.file.buffer.size", 4096);
-            out = fs.create(logFile, 
+              LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+            out = LOGDIR_FS.create(logFile, 
                             new FsPermission(HISTORY_FILE_PERMISSION),
                             true, 
                             defaultBufferSize, 
-                            fs.getDefaultReplication(), 
+                            LOGDIR_FS.getDefaultReplication(), 
                             jobHistoryBlockSize, null);
             writer = new PrintWriter(out);
             fileManager.addWriter(jobId, writer);
@@ -1147,16 +1220,15 @@ public class JobHistory {
       FSDataOutputStream jobFileOut = null;
       try {
         if (LOG_DIR != null) {
-          fs = new Path(LOG_DIR).getFileSystem(jobConf);
           int defaultBufferSize = 
-              fs.getConf().getInt("io.file.buffer.size", 4096);
-          if (!fs.exists(jobFilePath)) {
-            jobFileOut = fs.create(jobFilePath, 
+              LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+          if (!LOGDIR_FS.exists(jobFilePath)) {
+            jobFileOut = LOGDIR_FS.create(jobFilePath, 
                                    new FsPermission(HISTORY_FILE_PERMISSION),
                                    true, 
                                    defaultBufferSize, 
-                                   fs.getDefaultReplication(), 
-                                   fs.getDefaultBlockSize(), null);
+                                   LOGDIR_FS.getDefaultReplication(), 
+                                   LOGDIR_FS.getDefaultBlockSize(), null);
             jobConf.writeXml(jobFileOut);
             jobFileOut.close();
           }
@@ -1943,13 +2015,12 @@ public class JobHistory {
       lastRan = now;  
       isRunning = true; 
       try {
-        FileSystem fs = DONE.getFileSystem(jtConf);
-        FileStatus[] historyFiles = fs.listStatus(DONE);
+        FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE);
         // delete if older than 30 days
         if (historyFiles != null) {
           for (FileStatus f : historyFiles) {
             if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
-              fs.delete(f.getPath(), true); 
+              DONEDIR_FS.delete(f.getPath(), true); 
               LOG.info("Deleting old history file : " + f.getPath());
             }
           }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:26:49 2011
@@ -1866,14 +1866,7 @@ public class JobTracker implements MRCon
     // initialize history parameters.
     boolean historyInitialized = JobHistory.init(conf, this.localMachine,
                                                  this.startTime);
-    String historyLogDir = null;
-    FileSystem historyFS = null;
-    if (historyInitialized) {
-      historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString();
-      infoServer.setAttribute("historyLogDir", historyLogDir);
-      historyFS = new Path(historyLogDir).getFileSystem(conf);
-      infoServer.setAttribute("fileSys", historyFS);
-    }
+    
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
     
@@ -1970,13 +1963,13 @@ public class JobTracker implements MRCon
       jobConf.deleteLocalFiles(SUBDIR);
     }
 
-    // Initialize history again if it is not initialized
-    // because history was on dfs and namenode was in safemode.
-    if (!historyInitialized) {
-      JobHistory.init(conf, this.localMachine, this.startTime); 
-      historyLogDir = conf.get("hadoop.job.history.location");
+    // Initialize history DONE folder
+    if (historyInitialized) {
+      JobHistory.initDone(conf, fs);
+      String historyLogDir = 
+        JobHistory.getCompletedJobHistoryLocation().toString();
       infoServer.setAttribute("historyLogDir", historyLogDir);
-      historyFS = new Path(historyLogDir).getFileSystem(conf);
+      FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
       infoServer.setAttribute("fileSys", historyFS);
     }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Mar  4 03:26:49 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobHistory.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -464,8 +465,7 @@ public class TestJobHistory extends Test
 
     // Get the history file name
     Path dir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = 
-      JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir);
+    String logFileName = getDoneFile(conf, id, dir);
 
     // Framework history log file location
     Path logFile = new Path(dir, logFileName);
@@ -763,8 +763,7 @@ public class TestJobHistory extends Test
     JobID id = job.getID();
     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
     // Get the history file name
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  doneDir);
+    String logFileName = getDoneFile(conf, id, doneDir);
 
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);
@@ -792,6 +791,92 @@ public class TestJobHistory extends Test
     validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
   }
 
+  public void testDoneFolderOnHDFS() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      // keep for less time
+      conf.setLong("mapred.jobtracker.retirejob.check", 1000);
+      conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+      //set the done folder location
+      String doneFolder = "history_done";
+      conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+
+      MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+      mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
+          3, null, null, conf);
+
+      // run the TCs
+      conf = mr.createJobConf();
+
+      FileSystem fs = FileSystem.get(conf);
+      // clean up
+      fs.delete(new Path("succeed"), true);
+
+      Path inDir = new Path("succeed/input");
+      Path outDir = new Path("succeed/output");
+
+      //Disable speculative execution
+      conf.setSpeculativeExecution(false);
+
+      // Make sure that the job is not removed from memory until we do finish
+      // the validation of history file content
+      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
+
+      // Run a job that will be succeeded and validate its history file
+      RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+      
+      Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      assertEquals("History DONE folder not correct", 
+          doneFolder, doneDir.getName());
+      JobID id = job.getID();
+      String logFileName = getDoneFile(conf, id, doneDir);
+
+      // Framework history log file location
+      Path logFile = new Path(doneDir, logFileName);
+      FileSystem fileSys = logFile.getFileSystem(conf);
+   
+      // Check if the history file exists
+      assertTrue("History file does not exist", fileSys.exists(logFile));
+
+      // check if the corresponding conf file exists
+      Path confFile = getPathForConf(logFile, doneDir);
+      assertTrue("Config for completed jobs doesnt exist", 
+                 fileSys.exists(confFile));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed job config doesnt exist in the done folder", 
+                 doneDir.getName().equals(confFile.getParent().getName()));
+
+      // check if the file exists in a done folder
+      assertTrue("Completed jobs doesnt exist in the done folder", 
+                 doneDir.getName().equals(logFile.getParent().getName()));
+      
+
+      // check if the job file is removed from the history location 
+      Path runningJobsHistoryFolder = logFile.getParent().getParent();
+      Path runningJobHistoryFilename = 
+        new Path(runningJobsHistoryFolder, logFile.getName());
+      Path runningJobConfFilename = 
+        new Path(runningJobsHistoryFolder, confFile.getName());
+      assertFalse("History file not deleted from the running folder", 
+                  fileSys.exists(runningJobHistoryFilename));
+      assertFalse("Config for completed jobs not deleted from running folder", 
+                  fileSys.exists(runningJobConfFilename));
+
+      validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+      validateJobHistoryFileContent(mr, job, conf);
+
+      // get the job conf filename
+    } finally {
+      if (mr != null) {
+        cleanupLocalFiles(mr);
+        mr.shutdown();
+      }
+    }
+  }
+
   /** Run a job that will be succeeded and validate its history file format
    *  and its content.
    */
@@ -802,6 +887,11 @@ public class TestJobHistory extends Test
       // keep for less time
       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
       conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+      //set the done folder location
+      String doneFolder = TEST_ROOT_DIR + "history_done";
+      conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+      
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 
       // run the TCs
@@ -825,9 +915,11 @@ public class TestJobHistory extends Test
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
       Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+      assertEquals("History DONE folder not correct", 
+          doneFolder, doneDir.toString());
       JobID id = job.getID();
-      String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                    doneDir);
+      String logFileName = getDoneFile(conf, id, doneDir);
+
       // Framework history log file location
       Path logFile = new Path(doneDir, logFileName);
       FileSystem fileSys = logFile.getFileSystem(conf);
@@ -842,11 +934,11 @@ public class TestJobHistory extends Test
 
       // check if the file exists in a done folder
       assertTrue("Completed job config doesnt exist in the done folder", 
-                 "done".equals(confFile.getParent().getName()));
+                 doneDir.getName().equals(confFile.getParent().getName()));
 
       // check if the file exists in a done folder
       assertTrue("Completed jobs doesnt exist in the done folder", 
-                 "done".equals(logFile.getParent().getName()));
+                 doneDir.getName().equals(logFile.getParent().getName()));
       
 
       // check if the job file is removed from the history location 
@@ -880,6 +972,17 @@ public class TestJobHistory extends Test
     }
   }
 
+  //Returns the file in the done folder
+  //Waits for sometime to get the file moved to done
+  private static String getDoneFile(JobConf conf, JobID id, 
+      Path doneDir) throws IOException {
+    String name = null;
+    for (int i = 0; name == null && i < 20; i++) {
+      name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
+      UtilsForTests.waitFor(1000);
+    }
+    return name;
+  }
   // Returns the output path where user history log file is written to with
   // default configuration setting for hadoop.job.history.user.location
   private static Path getLogLocationInOutputPath(String logFileName,
@@ -900,8 +1003,7 @@ public class TestJobHistory extends Test
           throws IOException  {
     // Get the history file name
     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  doneDir);
+    String logFileName = getDoneFile(conf, id, doneDir);
 
     // User history log file location
     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
@@ -1012,8 +1114,7 @@ public class TestJobHistory extends Test
 
     // Get the history file name
     Path doneDir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  doneDir);
+    String logFileName = getDoneFile(conf, id, doneDir);
 
     // Framework history log file location
     Path logFile = new Path(doneDir, logFileName);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar  4 03:26:49 2011
@@ -320,48 +320,7 @@ public class TestJobTrackerRestart exten
     assertTrue("Cluster status is insane", 
                checkClusterStatusOnCompletion(status, prevStatus));
   }
-  
-  /**
-   * Checks if the history files are as expected
-   * @param id job id
-   * @param conf job conf
-   */
-  private void testJobHistoryFiles(JobID id, JobConf conf) 
-  throws IOException  {
-    // Get the history files for users
-    Path dir = JobHistory.getCompletedJobHistoryLocation();
-    String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id, 
-                                                                  dir);
-    String tempLogFileName = 
-      JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
-    
-    // I. User files
-    Path logFile = 
-      JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf);
-    FileSystem fileSys = logFile.getFileSystem(conf);
-    
-    // Check if the history file exists
-    assertTrue("User log file does not exist", fileSys.exists(logFile));
-    
-    // Check if the temporary file is deleted
-    Path tempLogFile = 
-      JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, 
-                                                         conf);
-    assertFalse("User temporary log file exists", fileSys.exists(tempLogFile));
-    
-    // II. Framework files
-    // Get the history file
-    logFile = new Path(dir, logFileName);
-    fileSys = logFile.getFileSystem(conf);
-    
-    // Check if the history file exists
-    assertTrue("Log file does not exist", fileSys.exists(logFile));
-    
-    // Check if the temporary file is deleted
-    tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName);
-    assertFalse("Temporary log file exists", fileSys.exists(tempLogFile));
-  }
-  
+
   /**
    * Matches specified number of task reports.
    * @param source the reports to be matched