You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:26:50 UTC
svn commit: r1076969 - in
/hadoop/common/branches/branch-0.20-security-patches/src: mapred/
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:26:49 2011
New Revision: 1076969
URL: http://svn.apache.org/viewvc?rev=1076969&view=rev
Log:
commit 74ce0696bde3778d2579e0bbeeed1012a9a4bf1b
Author: Yahoo\! <lt...@yahoo-inc.com>
Date: Tue Aug 11 10:29:11 2009 -0700
Applying patch 2923008.mr814.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 03:26:49 2011
@@ -26,6 +26,15 @@
</description>
</property>
+<property>
+ <name>mapred.job.tracker.history.completed.location</name>
+ <value></value>
+ <description> The completed job history files are stored at this single well
+ known location. If nothing is specified, the files are stored at
+ ${hadoop.job.history.location}/done.
+ </description>
+</property>
+
<!-- i/o properties -->
<property>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar 4 03:26:49 2011
@@ -30,14 +30,19 @@ import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -101,6 +106,8 @@ public class JobHistory {
FsPermission.createImmutable((short) 0750); // rwxr-x---
final static FsPermission HISTORY_FILE_PERMISSION =
FsPermission.createImmutable((short) 0740); // rwxr-----
+ private static FileSystem LOGDIR_FS; // log dir filesystem
+ private static FileSystem DONEDIR_FS; // Done dir filesystem
private static JobConf jtConf;
private static Path DONE = null; // folder for completed jobs
/**
@@ -125,11 +132,23 @@ public class JobHistory {
Path historyFilename; // path of job history file
Path confFilename; // path of job's conf
}
-
+
+ private ThreadPoolExecutor executor = null;
+ private final Configuration conf;
+
// cache from job-key to files associated with it.
private Map<JobID, FilesHolder> fileCache =
new ConcurrentHashMap<JobID, FilesHolder>();
+ JobHistoryFilesManager(Configuration conf) throws IOException {
+ this.conf = conf;
+ }
+
+ void start() {
+ executor = new ThreadPoolExecutor(1, 3, 1,
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+ }
+
private FilesHolder getFileHolder(JobID id) {
FilesHolder holder = fileCache.get(id);
if (holder == null) {
@@ -172,6 +191,33 @@ public class JobHistory {
void purgeJob(JobID id) {
fileCache.remove(id);
}
+
+ void moveToDone(final JobID id, final List<Path> paths) {
+ executor.execute(new Runnable() {
+
+ public void run() {
+ //move the files to DONE folder
+ try {
+ for (Path path : paths) {
+ //check if path exists, in case of retries it may not exist
+ if (LOGDIR_FS.exists(path)) {
+ LOG.info("Moving " + path.toString() + " to " +
+ DONE.toString());
+ DONEDIR_FS.moveFromLocalFile(path, DONE);
+ DONEDIR_FS.setPermission(new Path(DONE, path.getName()),
+ new FsPermission(HISTORY_FILE_PERMISSION));
+ }
+ }
+
+ //purge the job from the cache
+ fileManager.purgeJob(id);
+ } catch (Throwable e) {
+ LOG.error("Unable to move history file to DONE folder.", e);
+ }
+ }
+
+ });
+ }
}
/**
* Record types are identifiers for each line of log in history files.
@@ -222,14 +268,13 @@ public class JobHistory {
"file:///" + new File(
System.getProperty("hadoop.log.dir")).getAbsolutePath()
+ File.separator + "history");
- DONE = new Path(LOG_DIR, "done");
JOBTRACKER_UNIQUE_STRING = hostname + "_" +
String.valueOf(jobTrackerStartTime) + "_";
jobtrackerHostname = hostname;
Path logDir = new Path(LOG_DIR);
- FileSystem fs = logDir.getFileSystem(conf);
- if (!fs.exists(logDir)){
- if (!fs.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
+ LOGDIR_FS = logDir.getFileSystem(conf);
+ if (!LOGDIR_FS.exists(logDir)){
+ if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
throw new IOException("Mkdirs failed to create " + logDir.toString());
}
}
@@ -241,11 +286,8 @@ public class JobHistory {
3 * 1024 * 1024);
jtConf = conf;
- // create the done folder with appropriate permission
- fs.mkdirs(DONE, HISTORY_DIR_PERMISSION);
-
// initialize the file manager
- fileManager = new JobHistoryFilesManager();
+ fileManager = new JobHistoryFilesManager(conf);
} catch(IOException e) {
LOG.error("Failed to initialize JobHistory log file", e);
disableHistory = true;
@@ -253,6 +295,38 @@ public class JobHistory {
return !(disableHistory);
}
+ static boolean initDone(JobConf conf, FileSystem fs){
+ try {
+ //if completed job history location is set, use that
+ String doneLocation = conf.
+ get("mapred.job.tracker.history.completed.location");
+ if (doneLocation != null) {
+ DONE = fs.makeQualified(new Path(doneLocation));
+ DONEDIR_FS = fs;
+ } else {
+ DONE = new Path(LOG_DIR, "done");
+ DONEDIR_FS = LOGDIR_FS;
+ }
+
+ //If not already present create the done folder with appropriate
+ //permission
+ if (!DONEDIR_FS.exists(DONE)) {
+ LOG.info("Creating DONE folder at "+ DONE);
+ if (! DONEDIR_FS.mkdirs(DONE,
+ new FsPermission(HISTORY_DIR_PERMISSION))) {
+ throw new IOException("Mkdirs failed to create " + DONE.toString());
+ }
+ }
+
+ fileManager.start();
+ } catch(IOException e) {
+ LOG.error("Failed to initialize JobHistory log file", e);
+ disableHistory = true;
+ }
+ return !(disableHistory);
+ }
+
+
/**
* Manages job-history's meta information such as version etc.
* Helps in logging version information to the job-history and recover
@@ -724,20 +798,26 @@ public class JobHistory {
public static synchronized String getJobHistoryFileName(JobConf jobConf,
JobID id)
throws IOException {
- return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR));
+ return getJobHistoryFileName(jobConf, id, new Path(LOG_DIR), LOGDIR_FS);
+ }
+
+ static synchronized String getDoneJobHistoryFileName(JobConf jobConf,
+ JobID id) throws IOException {
+ if (DONE == null) {
+ return null;
+ }
+ return getJobHistoryFileName(jobConf, id, DONE, DONEDIR_FS);
}
/**
* @param dir The directory where to search.
*/
- static synchronized String getJobHistoryFileName(JobConf jobConf,
- JobID id,
- Path dir)
+ private static synchronized String getJobHistoryFileName(JobConf jobConf,
+ JobID id, Path dir, FileSystem fs)
throws IOException {
String user = getUserName(jobConf);
String jobName = trimJobName(getJobName(jobConf));
- FileSystem fs = new Path(LOG_DIR).getFileSystem(jobConf);
if (LOG_DIR == null) {
return null;
}
@@ -805,9 +885,8 @@ public class JobHistory {
throws IOException {
Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
if (logPath != null) {
- FileSystem fs = logPath.getFileSystem(conf);
LOG.info("Deleting job history file " + logPath.getName());
- fs.delete(logPath, false);
+ LOGDIR_FS.delete(logPath, false);
}
// do the same for the user file too
logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName,
@@ -835,25 +914,24 @@ public class JobHistory {
Path logFilePath)
throws IOException {
Path ret;
- FileSystem fs = logFilePath.getFileSystem(conf);
String logFileName = logFilePath.getName();
String tmpFilename = getSecondaryJobHistoryFile(logFileName);
Path logDir = logFilePath.getParent();
Path tmpFilePath = new Path(logDir, tmpFilename);
- if (fs.exists(logFilePath)) {
+ if (LOGDIR_FS.exists(logFilePath)) {
LOG.info(logFileName + " exists!");
- if (fs.exists(tmpFilePath)) {
+ if (LOGDIR_FS.exists(tmpFilePath)) {
LOG.info("Deleting " + tmpFilename
+ " and using " + logFileName + " for recovery.");
- fs.delete(tmpFilePath, false);
+ LOGDIR_FS.delete(tmpFilePath, false);
}
ret = tmpFilePath;
} else {
LOG.info(logFileName + " doesnt exist! Using "
+ tmpFilename + " for recovery.");
- if (fs.exists(tmpFilePath)) {
+ if (LOGDIR_FS.exists(tmpFilePath)) {
LOG.info("Renaming " + tmpFilename + " to " + logFileName);
- fs.rename(tmpFilePath, logFilePath);
+ LOGDIR_FS.rename(tmpFilePath, logFilePath);
ret = tmpFilePath;
} else {
ret = logFilePath;
@@ -863,7 +941,7 @@ public class JobHistory {
// do the same for the user files too
logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
if (logFilePath != null) {
- fs = logFilePath.getFileSystem(conf);
+ FileSystem fs = logFilePath.getFileSystem(conf);
logDir = logFilePath.getParent();
tmpFilePath = new Path(logDir, tmpFilename);
if (fs.exists(logFilePath)) {
@@ -911,8 +989,7 @@ public class JobHistory {
// rename the tmp file to the master file. Note that this should be
// done only when the file is closed and handles are released.
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
- FileSystem fs = tmpLogPath.getFileSystem(jtConf);
- fs.rename(tmpLogPath, masterLogPath);
+ LOGDIR_FS.rename(tmpLogPath, masterLogPath);
// update the cache
fileManager.setHistoryFile(id, masterLogPath);
@@ -924,7 +1001,7 @@ public class JobHistory {
JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName,
conf);
if (masterLogPath != null) {
- fs = masterLogPath.getFileSystem(conf);
+ FileSystem fs = masterLogPath.getFileSystem(conf);
if (fs.exists(tmpLogPath)) {
LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
+ " in user directory");
@@ -966,29 +1043,27 @@ public class JobHistory {
* This *should* be the last call to jobhistory for a given job.
*/
static void markCompleted(JobID id) throws IOException {
+ List<Path> paths = new ArrayList<Path>();
Path path = fileManager.getHistoryFile(id);
if (path == null) {
LOG.info("No file for job-history with " + id + " found in cache!");
return;
+ } else {
+ paths.add(path);
}
- Path newPath = new Path(DONE, path.getName());
- LOG.info("Moving completed job from " + path + " to " + newPath);
- FileSystem fs = path.getFileSystem(jtConf);
- fs.rename(path, newPath);
Path confPath = fileManager.getConfFileWriters(id);
if (confPath == null) {
LOG.info("No file for jobconf with " + id + " found in cache!");
return;
+ } else {
+ paths.add(confPath);
}
- // move the conf too
- newPath = new Path(DONE, confPath.getName());
- LOG.info("Moving configuration of completed job from " + confPath
- + " to " + newPath);
- fs.rename(confPath, newPath);
- // purge the job from the cache
- fileManager.purgeJob(id);
+ //move the job files to done folder and purge the job
+ if (paths.size() > 0) {
+ fileManager.moveToDone(id, paths);
+ }
}
/**
@@ -1056,20 +1131,18 @@ public class JobHistory {
if (LOG_DIR != null) {
// create output stream for logging in hadoop.job.history.location
- fs = new Path(LOG_DIR).getFileSystem(jobConf);
-
if (restarted) {
logFile = recoverJobHistoryFile(jobConf, logFile);
logFileName = logFile.getName();
}
int defaultBufferSize =
- fs.getConf().getInt("io.file.buffer.size", 4096);
- out = fs.create(logFile,
+ LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+ out = LOGDIR_FS.create(logFile,
new FsPermission(HISTORY_FILE_PERMISSION),
true,
defaultBufferSize,
- fs.getDefaultReplication(),
+ LOGDIR_FS.getDefaultReplication(),
jobHistoryBlockSize, null);
writer = new PrintWriter(out);
fileManager.addWriter(jobId, writer);
@@ -1147,16 +1220,15 @@ public class JobHistory {
FSDataOutputStream jobFileOut = null;
try {
if (LOG_DIR != null) {
- fs = new Path(LOG_DIR).getFileSystem(jobConf);
int defaultBufferSize =
- fs.getConf().getInt("io.file.buffer.size", 4096);
- if (!fs.exists(jobFilePath)) {
- jobFileOut = fs.create(jobFilePath,
+ LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+ if (!LOGDIR_FS.exists(jobFilePath)) {
+ jobFileOut = LOGDIR_FS.create(jobFilePath,
new FsPermission(HISTORY_FILE_PERMISSION),
true,
defaultBufferSize,
- fs.getDefaultReplication(),
- fs.getDefaultBlockSize(), null);
+ LOGDIR_FS.getDefaultReplication(),
+ LOGDIR_FS.getDefaultBlockSize(), null);
jobConf.writeXml(jobFileOut);
jobFileOut.close();
}
@@ -1943,13 +2015,12 @@ public class JobHistory {
lastRan = now;
isRunning = true;
try {
- FileSystem fs = DONE.getFileSystem(jtConf);
- FileStatus[] historyFiles = fs.listStatus(DONE);
+ FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE);
// delete if older than 30 days
if (historyFiles != null) {
for (FileStatus f : historyFiles) {
if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
- fs.delete(f.getPath(), true);
+ DONEDIR_FS.delete(f.getPath(), true);
LOG.info("Deleting old history file : " + f.getPath());
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:26:49 2011
@@ -1866,14 +1866,7 @@ public class JobTracker implements MRCon
// initialize history parameters.
boolean historyInitialized = JobHistory.init(conf, this.localMachine,
this.startTime);
- String historyLogDir = null;
- FileSystem historyFS = null;
- if (historyInitialized) {
- historyLogDir = JobHistory.getCompletedJobHistoryLocation().toString();
- infoServer.setAttribute("historyLogDir", historyLogDir);
- historyFS = new Path(historyLogDir).getFileSystem(conf);
- infoServer.setAttribute("fileSys", historyFS);
- }
+
infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
infoServer.start();
@@ -1970,13 +1963,13 @@ public class JobTracker implements MRCon
jobConf.deleteLocalFiles(SUBDIR);
}
- // Initialize history again if it is not initialized
- // because history was on dfs and namenode was in safemode.
- if (!historyInitialized) {
- JobHistory.init(conf, this.localMachine, this.startTime);
- historyLogDir = conf.get("hadoop.job.history.location");
+ // Initialize history DONE folder
+ if (historyInitialized) {
+ JobHistory.initDone(conf, fs);
+ String historyLogDir =
+ JobHistory.getCompletedJobHistoryLocation().toString();
infoServer.setAttribute("historyLogDir", historyLogDir);
- historyFS = new Path(historyLogDir).getFileSystem(conf);
+ FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
infoServer.setAttribute("fileSys", historyFS);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Mar 4 03:26:49 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobHistory.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -464,8 +465,7 @@ public class TestJobHistory extends Test
// Get the history file name
Path dir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName =
- JobHistory.JobInfo.getJobHistoryFileName(conf, id, dir);
+ String logFileName = getDoneFile(conf, id, dir);
// Framework history log file location
Path logFile = new Path(dir, logFileName);
@@ -763,8 +763,7 @@ public class TestJobHistory extends Test
JobID id = job.getID();
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
// Get the history file name
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
@@ -792,6 +791,92 @@ public class TestJobHistory extends Test
validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
}
+ public void testDoneFolderOnHDFS() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ JobConf conf = new JobConf();
+ // keep for less time
+ conf.setLong("mapred.jobtracker.retirejob.check", 1000);
+ conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+ //set the done folder location
+ String doneFolder = "history_done";
+ conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+
+ MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+ mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
+ 3, null, null, conf);
+
+ // run the TCs
+ conf = mr.createJobConf();
+
+ FileSystem fs = FileSystem.get(conf);
+ // clean up
+ fs.delete(new Path("succeed"), true);
+
+ Path inDir = new Path("succeed/input");
+ Path outDir = new Path("succeed/output");
+
+ //Disable speculative execution
+ conf.setSpeculativeExecution(false);
+
+ // Make sure that the job is not removed from memory until we do finish
+ // the validation of history file content
+ conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
+
+ // Run a job that will be succeeded and validate its history file
+ RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
+
+ Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ assertEquals("History DONE folder not correct",
+ doneFolder, doneDir.getName());
+ JobID id = job.getID();
+ String logFileName = getDoneFile(conf, id, doneDir);
+
+ // Framework history log file location
+ Path logFile = new Path(doneDir, logFileName);
+ FileSystem fileSys = logFile.getFileSystem(conf);
+
+ // Check if the history file exists
+ assertTrue("History file does not exist", fileSys.exists(logFile));
+
+ // check if the corresponding conf file exists
+ Path confFile = getPathForConf(logFile, doneDir);
+ assertTrue("Config for completed jobs doesnt exist",
+ fileSys.exists(confFile));
+
+ // check if the file exists in a done folder
+ assertTrue("Completed job config doesnt exist in the done folder",
+ doneDir.getName().equals(confFile.getParent().getName()));
+
+ // check if the file exists in a done folder
+ assertTrue("Completed jobs doesnt exist in the done folder",
+ doneDir.getName().equals(logFile.getParent().getName()));
+
+
+ // check if the job file is removed from the history location
+ Path runningJobsHistoryFolder = logFile.getParent().getParent();
+ Path runningJobHistoryFilename =
+ new Path(runningJobsHistoryFolder, logFile.getName());
+ Path runningJobConfFilename =
+ new Path(runningJobsHistoryFolder, confFile.getName());
+ assertFalse("History file not deleted from the running folder",
+ fileSys.exists(runningJobHistoryFilename));
+ assertFalse("Config for completed jobs not deleted from running folder",
+ fileSys.exists(runningJobConfFilename));
+
+ validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+ validateJobHistoryFileContent(mr, job, conf);
+
+ // get the job conf filename
+ } finally {
+ if (mr != null) {
+ cleanupLocalFiles(mr);
+ mr.shutdown();
+ }
+ }
+ }
+
/** Run a job that will be succeeded and validate its history file format
* and its content.
*/
@@ -802,6 +887,11 @@ public class TestJobHistory extends Test
// keep for less time
conf.setLong("mapred.jobtracker.retirejob.check", 1000);
conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
+
+ //set the done folder location
+ String doneFolder = TEST_ROOT_DIR + "history_done";
+ conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+
mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
// run the TCs
@@ -825,9 +915,11 @@ public class TestJobHistory extends Test
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ assertEquals("History DONE folder not correct",
+ doneFolder, doneDir.toString());
JobID id = job.getID();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
+
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
@@ -842,11 +934,11 @@ public class TestJobHistory extends Test
// check if the file exists in a done folder
assertTrue("Completed job config doesnt exist in the done folder",
- "done".equals(confFile.getParent().getName()));
+ doneDir.getName().equals(confFile.getParent().getName()));
// check if the file exists in a done folder
assertTrue("Completed jobs doesnt exist in the done folder",
- "done".equals(logFile.getParent().getName()));
+ doneDir.getName().equals(logFile.getParent().getName()));
// check if the job file is removed from the history location
@@ -880,6 +972,17 @@ public class TestJobHistory extends Test
}
}
+ //Returns the file in the done folder
+ //Waits for sometime to get the file moved to done
+ private static String getDoneFile(JobConf conf, JobID id,
+ Path doneDir) throws IOException {
+ String name = null;
+ for (int i = 0; name == null && i < 20; i++) {
+ name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
+ UtilsForTests.waitFor(1000);
+ }
+ return name;
+ }
// Returns the output path where user history log file is written to with
// default configuration setting for hadoop.job.history.user.location
private static Path getLogLocationInOutputPath(String logFileName,
@@ -900,8 +1003,7 @@ public class TestJobHistory extends Test
throws IOException {
// Get the history file name
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
// User history log file location
Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
@@ -1012,8 +1114,7 @@ public class TestJobHistory extends Test
// Get the history file name
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- doneDir);
+ String logFileName = getDoneFile(conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1076969&r1=1076968&r2=1076969&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar 4 03:26:49 2011
@@ -320,48 +320,7 @@ public class TestJobTrackerRestart exten
assertTrue("Cluster status is insane",
checkClusterStatusOnCompletion(status, prevStatus));
}
-
- /**
- * Checks if the history files are as expected
- * @param id job id
- * @param conf job conf
- */
- private void testJobHistoryFiles(JobID id, JobConf conf)
- throws IOException {
- // Get the history files for users
- Path dir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id,
- dir);
- String tempLogFileName =
- JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
-
- // I. User files
- Path logFile =
- JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf);
- FileSystem fileSys = logFile.getFileSystem(conf);
-
- // Check if the history file exists
- assertTrue("User log file does not exist", fileSys.exists(logFile));
-
- // Check if the temporary file is deleted
- Path tempLogFile =
- JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName,
- conf);
- assertFalse("User temporary log file exists", fileSys.exists(tempLogFile));
-
- // II. Framework files
- // Get the history file
- logFile = new Path(dir, logFileName);
- fileSys = logFile.getFileSystem(conf);
-
- // Check if the history file exists
- assertTrue("Log file does not exist", fileSys.exists(logFile));
-
- // Check if the temporary file is deleted
- tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName);
- assertFalse("Temporary log file exists", fileSys.exists(tempLogFile));
- }
-
+
/**
* Matches specified number of task reports.
* @param source the reports to be matched