You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/08/10 06:22:13 UTC
svn commit: r802645 - in /hadoop/mapreduce/trunk: ./ src/java/
src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Author: sharad
Date: Mon Aug 10 04:22:12 2009
New Revision: 802645
URL: http://svn.apache.org/viewvc?rev=802645&view=rev
Log:
MAPREDUCE-814. Provide a way to configure completed job history files to be on HDFS.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/mapred-default.xml
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 10 04:22:12 2009
@@ -50,6 +50,9 @@
MAPREDUCE-705. User-configurable quote and delimiter characters for Sqoop
records and record reparsing. (Aaron Kimball via tomwhite)
+ MAPREDUCE-814. Provide a way to configure completed job history files
+ to be on HDFS. (sharad)
+
IMPROVEMENTS
MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.
Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Mon Aug 10 04:22:12 2009
@@ -27,6 +27,15 @@
</property>
<property>
+ <name>mapred.job.tracker.history.completed.location</name>
+ <value></value>
+ <description> The completed job history files are stored at this single well
+ known location. If nothing is specified, the files are stored at
+ ${hadoop.job.history.location}/done.
+ </description>
+</property>
+
+<property>
<name>mapred.committer.job.setup.cleanup.needed</name>
<value>true</value>
<description> true, if job needs job-setup and job-cleanup.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Mon Aug 10 04:22:12 2009
@@ -30,14 +30,19 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -102,7 +107,8 @@
FsPermission.createImmutable((short) 0750); // rwxr-x---
final static FsPermission HISTORY_FILE_PERMISSION =
FsPermission.createImmutable((short) 0740); // rwxr-----
- private static FileSystem JT_FS; // jobtracker's filesystem
+ private static FileSystem LOGDIR_FS; // log dir filesystem
+ private static FileSystem DONEDIR_FS; // Done dir filesystem
private static Path DONE = null; // folder for completed jobs
/**
@@ -118,11 +124,23 @@
Path historyFilename; // path of job history file
Path confFilename; // path of job's conf
}
-
+
+ private ThreadPoolExecutor executor = null;
+ private final Configuration conf;
+
// cache from job-key to files associated with it.
private Map<JobID, FilesHolder> fileCache =
new ConcurrentHashMap<JobID, FilesHolder>();
+ JobHistoryFilesManager(Configuration conf) throws IOException {
+ this.conf = conf;
+ }
+
+ void start() {
+ executor = new ThreadPoolExecutor(1, 3, 1,
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+ }
+
private FilesHolder getFileHolder(JobID id) {
FilesHolder holder = fileCache.get(id);
if (holder == null) {
@@ -165,6 +183,33 @@
void purgeJob(JobID id) {
fileCache.remove(id);
}
+
+ void moveToDone(final JobID id, final List<Path> paths) {
+ executor.execute(new Runnable() {
+
+ public void run() {
+ //move the files to DONE folder
+ try {
+ for (Path path : paths) {
+ //check if path exists, in case of retries it may not exist
+ if (LOGDIR_FS.exists(path)) {
+ LOG.info("Moving " + path.toString() + " to " +
+ DONE.toString());
+ DONEDIR_FS.moveFromLocalFile(path, DONE);
+ DONEDIR_FS.setPermission(new Path(DONE, path.getName()),
+ new FsPermission(HISTORY_FILE_PERMISSION));
+ }
+ }
+
+ //purge the job from the cache
+ fileManager.purgeJob(id);
+ } catch (Throwable e) {
+ LOG.error("Unable to move history file to DONE folder.", e);
+ }
+ }
+
+ });
+ }
}
/**
* Record types are identifiers for each line of log in history files.
@@ -229,18 +274,18 @@
"file:///" + new File(
System.getProperty("hadoop.log.dir")).getAbsolutePath()
+ File.separator + "history");
- DONE = new Path(LOG_DIR, "done");
JOBTRACKER_UNIQUE_STRING = hostname + "_" +
String.valueOf(jobTrackerStartTime) + "_";
jobtrackerHostname = hostname;
Path logDir = new Path(LOG_DIR);
if (fs == null) {
- JT_FS = logDir.getFileSystem(conf);
+ LOGDIR_FS = logDir.getFileSystem(conf);
} else {
- JT_FS = fs;
+ LOGDIR_FS = fs;
}
- if (!JT_FS.exists(logDir)){
- if (!JT_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
+ if (!LOGDIR_FS.exists(logDir)){
+ if (!LOGDIR_FS.mkdirs(logDir,
+ new FsPermission(HISTORY_DIR_PERMISSION))) {
throw new IOException("Mkdirs failed to create " + logDir.toString());
}
}
@@ -250,12 +295,40 @@
jobHistoryBlockSize =
conf.getLong("mapred.jobtracker.job.history.block.size",
3 * 1024 * 1024);
-
- // create the done folder with appropriate permission
- JT_FS.mkdirs(DONE, HISTORY_DIR_PERMISSION);
// initialize the file manager
- fileManager = new JobHistoryFilesManager();
+ fileManager = new JobHistoryFilesManager(conf);
+ } catch(IOException e) {
+ LOG.error("Failed to initialize JobHistory log file", e);
+ disableHistory = true;
+ }
+ return !(disableHistory);
+ }
+
+ static boolean initDone(JobConf conf, FileSystem fs){
+ try {
+ //if completed job history location is set, use that
+ String doneLocation = conf.
+ get("mapred.job.tracker.history.completed.location");
+ if (doneLocation != null) {
+ DONE = fs.makeQualified(new Path(doneLocation));
+ DONEDIR_FS = fs;
+ } else {
+ DONE = new Path(LOG_DIR, "done");
+ DONEDIR_FS = LOGDIR_FS;
+ }
+
+ //If not already present create the done folder with appropriate
+ //permission
+ if (!DONEDIR_FS.exists(DONE)) {
+ LOG.info("Creating DONE folder at "+ DONE);
+ if (! DONEDIR_FS.mkdirs(DONE,
+ new FsPermission(HISTORY_DIR_PERMISSION))) {
+ throw new IOException("Mkdirs failed to create " + DONE.toString());
+ }
+ }
+
+ fileManager.start();
} catch(IOException e) {
LOG.error("Failed to initialize JobHistory log file", e);
disableHistory = true;
@@ -739,15 +812,22 @@
if (LOG_DIR == null) {
return null;
}
- return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
+ return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
+ }
+
+ static synchronized String getDoneJobHistoryFileName(JobConf jobConf,
+ JobID id) throws IOException {
+ if (DONE == null) {
+ return null;
+ }
+ return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS);
}
/**
* @param dir The directory where to search.
*/
- static synchronized String getJobHistoryFileName(JobConf jobConf,
- JobID id,
- Path dir)
+ private static synchronized String getJobHistoryFileName(JobConf jobConf,
+ JobID id, Path dir, FileSystem fs)
throws IOException {
String user = getUserName(jobConf);
String jobName = trimJobName(getJobName(jobConf));
@@ -776,7 +856,7 @@
}
};
- FileStatus[] statuses = JT_FS.listStatus(dir, filter);
+ FileStatus[] statuses = fs.listStatus(dir, filter);
String filename = null;
if (statuses.length == 0) {
LOG.info("Nothing to recover for job " + id);
@@ -816,7 +896,7 @@
Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
if (logPath != null) {
LOG.info("Deleting job history file " + logPath.getName());
- JT_FS.delete(logPath, false);
+ LOGDIR_FS.delete(logPath, false);
}
// do the same for the user file too
logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName,
@@ -848,20 +928,20 @@
String tmpFilename = getSecondaryJobHistoryFile(logFileName);
Path logDir = logFilePath.getParent();
Path tmpFilePath = new Path(logDir, tmpFilename);
- if (JT_FS.exists(logFilePath)) {
+ if (LOGDIR_FS.exists(logFilePath)) {
LOG.info(logFileName + " exists!");
- if (JT_FS.exists(tmpFilePath)) {
+ if (LOGDIR_FS.exists(tmpFilePath)) {
LOG.info("Deleting " + tmpFilename
+ " and using " + logFileName + " for recovery.");
- JT_FS.delete(tmpFilePath, false);
+ LOGDIR_FS.delete(tmpFilePath, false);
}
ret = tmpFilePath;
} else {
LOG.info(logFileName + " doesnt exist! Using "
+ tmpFilename + " for recovery.");
- if (JT_FS.exists(tmpFilePath)) {
+ if (LOGDIR_FS.exists(tmpFilePath)) {
LOG.info("Renaming " + tmpFilename + " to " + logFileName);
- JT_FS.rename(tmpFilePath, logFilePath);
+ LOGDIR_FS.rename(tmpFilePath, logFilePath);
ret = tmpFilePath;
} else {
ret = logFilePath;
@@ -919,7 +999,7 @@
// rename the tmp file to the master file. Note that this should be
// done only when the file is closed and handles are released.
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
- JT_FS.rename(tmpLogPath, masterLogPath);
+ LOGDIR_FS.rename(tmpLogPath, masterLogPath);
// update the cache
fileManager.setHistoryFile(id, masterLogPath);
@@ -960,28 +1040,25 @@
* This *should* be the last call to jobhistory for a given job.
*/
static void markCompleted(JobID id) throws IOException {
+ List<Path> paths = new ArrayList<Path>();
Path path = fileManager.getHistoryFile(id);
if (path == null) {
LOG.info("No file for job-history with " + id + " found in cache!");
- return;
+ } else {
+ paths.add(path);
}
- Path newPath = new Path(DONE, path.getName());
- LOG.info("Moving completed job from " + path + " to " + newPath);
- JT_FS.rename(path, newPath);
Path confPath = fileManager.getConfFileWriters(id);
if (confPath == null) {
LOG.info("No file for jobconf with " + id + " found in cache!");
- return;
+ } else {
+ paths.add(confPath);
}
- // move the conf too
- newPath = new Path(DONE, confPath.getName());
- LOG.info("Moving configuration of completed job from " + confPath
- + " to " + newPath);
- JT_FS.rename(confPath, newPath);
- // purge the job from the cache
- fileManager.purgeJob(id);
+ //move the job files to done folder and purge the job
+ if (paths.size() > 0) {
+ fileManager.moveToDone(id, paths);
+ }
}
/**
@@ -1056,12 +1133,12 @@
}
int defaultBufferSize =
- JT_FS.getConf().getInt("io.file.buffer.size", 4096);
- out = JT_FS.create(logFile,
+ LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+ out = LOGDIR_FS.create(logFile,
new FsPermission(HISTORY_FILE_PERMISSION),
EnumSet.of(CreateFlag.OVERWRITE),
defaultBufferSize,
- JT_FS.getDefaultReplication(),
+ LOGDIR_FS.getDefaultReplication(),
jobHistoryBlockSize, null);
writer = new PrintWriter(out);
fileManager.addWriter(jobId, writer);
@@ -1140,14 +1217,14 @@
try {
if (LOG_DIR != null) {
int defaultBufferSize =
- JT_FS.getConf().getInt("io.file.buffer.size", 4096);
- if (!JT_FS.exists(jobFilePath)) {
- jobFileOut = JT_FS.create(jobFilePath,
+ LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+ if (!LOGDIR_FS.exists(jobFilePath)) {
+ jobFileOut = LOGDIR_FS.create(jobFilePath,
new FsPermission(HISTORY_FILE_PERMISSION),
EnumSet.of(CreateFlag.OVERWRITE),
defaultBufferSize,
- JT_FS.getDefaultReplication(),
- JT_FS.getDefaultBlockSize(), null);
+ LOGDIR_FS.getDefaultReplication(),
+ LOGDIR_FS.getDefaultBlockSize(), null);
jobConf.writeXml(jobFileOut);
jobFileOut.close();
}
@@ -1963,12 +2040,12 @@
lastRan = now;
isRunning = true;
try {
- FileStatus[] historyFiles = JT_FS.listStatus(DONE);
+ FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE);
// delete if older than 30 days
if (historyFiles != null) {
for (FileStatus f : historyFiles) {
if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
- JT_FS.delete(f.getPath(), true);
+ DONEDIR_FS.delete(f.getPath(), true);
LOG.info("Deleting old history file : " + f.getPath());
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Aug 10 04:22:12 2009
@@ -1873,14 +1873,7 @@
// initialize history parameters.
boolean historyInitialized = JobHistory.init(conf, this.localMachine,
this.startTime);
- String historyLogDir = null;
- FileSystem historyFS = null;
- if (historyInitialized) {
- historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString();
- infoServer.setAttribute("historyLogDir", historyLogDir);
- historyFS = new Path(historyLogDir).getFileSystem(conf);
- infoServer.setAttribute("fileSys", historyFS);
- }
+
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
infoServer.start();
@@ -1969,13 +1962,13 @@
// Same with 'localDir' except it's always on the local disk.
jobConf.deleteLocalFiles(SUBDIR);
- // Initialize history again if it is not initialized
- // because history was on dfs and namenode was in safemode.
- if (!historyInitialized) {
- JobHistory.init(conf, this.localMachine, this.startTime);
- historyLogDir = conf.get("hadoop.job.history.location");
+ // Initialize history DONE folder
+ if (historyInitialized) {
+ JobHistory.initDone(conf, fs);
+ String historyLogDir =
+ JobHistory.getCompletedJobHistoryLocation().toString();
infoServer.setAttribute("historyLogDir", historyLogDir);
- historyFS = new Path(historyLogDir).getFileSystem(conf);
+ FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
infoServer.setAttribute("fileSys", historyFS);
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Mon Aug 10 04:22:12 2009
@@ -36,6 +36,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobHistory.*;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.commons.logging.Log;
@@ -474,8 +475,7 @@
// Get the history file name
Path dir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName =
- JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir);
+ String logFileName = getDoneFile(conf, id, dir);
// Framework history log file location
Path logFile = new Path(dir, logFileName);
@@ -772,8 +772,7 @@
JobID id = job.getID();
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
// Get the history file name
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
@@ -801,6 +800,92 @@
validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
}
+ public void testDoneFolderOnHDFS() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ JobConf conf = new JobConf();
+ // keep for less time
+ conf.setLong("mapred.jobtracker.retirejob.check", 1000);
+ conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+ //set the done folder location
+ String doneFolder = "history_done";
+ conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+
+ MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+ mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
+ 3, null, null, conf);
+
+ // run the TCs
+ conf = mr.createJobConf();
+
+ FileSystem fs = FileSystem.get(conf);
+ // clean up
+ fs.delete(new Path("succeed"), true);
+
+ Path inDir = new Path("succeed/input");
+ Path outDir = new Path("succeed/output");
+
+ //Disable speculative execution
+ conf.setSpeculativeExecution(false);
+
+ // Make sure that the job is not removed from memory until we do finish
+ // the validation of history file content
+ conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
+
+ // Run a job that will be succeeded and validate its history file
+ RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+
+ Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ assertEquals("History DONE folder not correct",
+ doneFolder, doneDir.getName());
+ JobID id = job.getID();
+ String logFileName = getDoneFile(conf, id, doneDir);
+
+ // Framework history log file location
+ Path logFile = new Path(doneDir, logFileName);
+ FileSystem fileSys = logFile.getFileSystem(conf);
+
+ // Check if the history file exists
+ assertTrue("History file does not exist", fileSys.exists(logFile));
+
+ // check if the corresponding conf file exists
+ Path confFile = getPathForConf(logFile, doneDir);
+ assertTrue("Config for completed jobs doesnt exist",
+ fileSys.exists(confFile));
+
+ // check if the file exists in a done folder
+ assertTrue("Completed job config doesnt exist in the done folder",
+ doneDir.getName().equals(confFile.getParent().getName()));
+
+ // check if the file exists in a done folder
+ assertTrue("Completed jobs doesnt exist in the done folder",
+ doneDir.getName().equals(logFile.getParent().getName()));
+
+
+ // check if the job file is removed from the history location
+ Path runningJobsHistoryFolder = logFile.getParent().getParent();
+ Path runningJobHistoryFilename =
+ new Path(runningJobsHistoryFolder, logFile.getName());
+ Path runningJobConfFilename =
+ new Path(runningJobsHistoryFolder, confFile.getName());
+ assertFalse("History file not deleted from the running folder",
+ fileSys.exists(runningJobHistoryFilename));
+ assertFalse("Config for completed jobs not deleted from running folder",
+ fileSys.exists(runningJobConfFilename));
+
+ validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+ validateJobHistoryFileContent(mr, job, conf);
+
+ // get the job conf filename
+ } finally {
+ if (mr != null) {
+ cleanupLocalFiles(mr);
+ mr.shutdown();
+ }
+ }
+ }
+
/** Run a job that will be succeeded and validate its history file format
* and its content.
*/
@@ -811,6 +896,11 @@
// keep for less time
conf.setLong("mapred.jobtracker.retirejob.check", 1000);
conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+ //set the done folder location
+ String doneFolder = TEST_ROOT_DIR + "history_done";
+ conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+
mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
// run the TCs
@@ -834,9 +924,11 @@
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ assertEquals("History DONE folder not correct",
+ doneFolder, doneDir.toString());
JobID id = job.getID();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
+
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
@@ -851,11 +943,11 @@
// check if the file exists in a done folder
assertTrue("Completed job config doesnt exist in the done folder",
- "done".equals(confFile.getParent().getName()));
+ doneDir.getName().equals(confFile.getParent().getName()));
// check if the file exists in a done folder
assertTrue("Completed jobs doesnt exist in the done folder",
- "done".equals(logFile.getParent().getName()));
+ doneDir.getName().equals(logFile.getParent().getName()));
// check if the job file is removed from the history location
@@ -889,6 +981,17 @@
}
}
+ //Returns the file in the done folder
+ //Waits for sometime to get the file moved to done
+ private static String getDoneFile(JobConf conf, JobID id,
+ Path doneDir) throws IOException {
+ String name = null;
+ for (int i = 0; name == null && i < 20; i++) {
+ name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
+ UtilsForTests.waitFor(1000);
+ }
+ return name;
+ }
// Returns the output path where user history log file is written to with
// default configuration setting for hadoop.job.history.user.location
private static Path getLogLocationInOutputPath(String logFileName,
@@ -909,8 +1012,7 @@
throws IOException {
// Get the history file name
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
// User history log file location
Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
@@ -1021,8 +1123,7 @@
// Get the history file name
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=802645&r1=802644&r2=802645&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java Mon Aug 10 04:22:12 2009
@@ -308,48 +308,7 @@
assertTrue("Cluster status is insane",
checkClusterStatusOnCompletion(status, prevStatus));
}
-
- /**
- * Checks if the history files are as expected
- * @param id job id
- * @param conf job conf
- */
- private void testJobHistoryFiles(JobID id, JobConf conf)
- throws IOException {
- // Get the history files for users
- Path dir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- dir);
- String tempLogFileName =
- JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
-
- // I. User files
- Path logFile =
- JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf);
- FileSystem fileSys = logFile.getFileSystem(conf);
-
- // Check if the history file exists
- assertTrue("User log file does not exist", fileSys.exists(logFile));
-
- // Check if the temporary file is deleted
- Path tempLogFile =
- JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName,
- conf);
- assertFalse("User temporary log file exists", fileSys.exists(tempLogFile));
-
- // II. Framework files
- // Get the history file
- logFile = new Path(dir, logFileName);
- fileSys = logFile.getFileSystem(conf);
-
- // Check if the history file exists
- assertTrue("Log file does not exist", fileSys.exists(logFile));
-
- // Check if the temporary file is deleted
- tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName);
- assertFalse("Temporary log file exists", fileSys.exists(tempLogFile));
- }
-
+
/**
* Matches specified number of task reports.
* @param source the reports to be matched