You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/08 14:47:45 UTC
svn commit: r763223 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/hdfs/
src/test/org/apache/hadoop/ipc/ src/test/org/apache/hadoop/mapred/
Author: sharad
Date: Wed Apr 8 12:47:44 2009
New Revision: 763223
URL: http://svn.apache.org/viewvc?rev=763223&view=rev
Log:
HADOOP-5394. JobTracker might schedule 2 attempts of the same task with the same attempt id across restarts. Contributed by Amar kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 8 12:47:44 2009
@@ -1210,6 +1210,9 @@
HADOOP-5585. Clear FileSystem statistics between tasks when jvm-reuse
is enabled. (omalley)
+ HADOOP-5394. JobTracker might schedule 2 attempts of the same task
+ with the same attempt id across restarts. (Amar Kamat via sharad)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed Apr 8 12:47:44 2009
@@ -122,7 +122,7 @@
FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
- TRACKER_NAME, STATE_STRING, VERSION, RESTART_COUNT
+ TRACKER_NAME, STATE_STRING, VERSION
}
/**
@@ -1167,9 +1167,15 @@
* @param submitTime job's submit time
* @param launchTime job's launch time
* @param restartCount number of times the job got restarted
+ * @deprecated Use {@link #logJobInfo(JobID, long, long)} instead.
*/
public static void logJobInfo(JobID jobid, long submitTime, long launchTime,
int restartCount){
+ logJobInfo(jobid, submitTime, launchTime);
+ }
+
+ public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
+ {
if (!disableHistory){
String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
@@ -1177,11 +1183,10 @@
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME,
- Keys.LAUNCH_TIME, Keys.RESTART_COUNT},
+ Keys.LAUNCH_TIME},
new String[] {jobid.toString(),
String.valueOf(submitTime),
- String.valueOf(launchTime),
- String.valueOf(restartCount)});
+ String.valueOf(launchTime)});
}
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Apr 8 12:47:44 2009
@@ -164,7 +164,7 @@
long finishTime;
// Indicates how many times the job got restarted
- private int restartCount = 0;
+ private final int restartCount;
private JobConf conf;
AtomicBoolean tasksInited = new AtomicBoolean(false);
@@ -214,6 +214,7 @@
this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
this.anyCacheLevel = this.maxLevel+1;
this.jobtracker = null;
+ this.restartCount = 0;
}
/**
@@ -222,6 +223,12 @@
*/
public JobInProgress(JobID jobid, JobTracker jobtracker,
JobConf default_conf) throws IOException {
+ this(jobid, jobtracker, default_conf, 0);
+ }
+
+ public JobInProgress(JobID jobid, JobTracker jobtracker,
+ JobConf default_conf, int rCount) throws IOException {
+ this.restartCount = rCount;
this.jobId = jobid;
String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
+ jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
@@ -595,19 +602,17 @@
}
// Update the job start/launch time (upon restart) and log to history
- synchronized void updateJobInfo(long startTime, long launchTime, int count) {
+ synchronized void updateJobInfo(long startTime, long launchTime) {
// log and change to the job's start/launch time
this.startTime = startTime;
this.launchTime = launchTime;
- // change to the job's restart count
- this.restartCount = count;
- JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime, count);
+ JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime);
}
/**
* Get the number of times the job has restarted
*/
- int numRestarts() {
+ int getNumRestarts() {
return restartCount;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Apr 8 12:47:44 2009
@@ -46,6 +46,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -123,6 +125,10 @@
final static FsPermission SYSTEM_DIR_PERMISSION =
FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+ // system files should have 700 permission
+ final static FsPermission SYSTEM_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx------
+
/**
* A client tried to submit a job before the Job Tracker was ready.
*/
@@ -672,6 +678,8 @@
Set<JobID> jobsToRecover; // set of jobs to be recovered
private int totalEventsRecovered = 0;
+ private int restartCount = 0;
+ private boolean shouldRecover = false;
Set<String> recoveredTrackers =
Collections.synchronizedSet(new HashSet<String>());
@@ -850,7 +858,7 @@
}
public boolean shouldRecover() {
- return jobsToRecover.size() != 0;
+ return shouldRecover;
}
public boolean shouldSchedule() {
@@ -888,18 +896,16 @@
// checks if the job dir has the required files
public void checkAndAddJob(FileStatus status) throws IOException {
- String jobName = status.getPath().getName();
- if (isJobNameValid(jobName)) {
+ String fileName = status.getPath().getName();
+ if (isJobNameValid(fileName)) {
if (JobClient.isJobDirValid(status.getPath(), fs)) {
- recoveryManager.addJobForRecovery(JobID.forName(jobName));
+ recoveryManager.addJobForRecovery(JobID.forName(fileName));
+ shouldRecover = true; // enable actual recovery if num-files > 1
} else {
- LOG.info("Found an incomplete job directory " + jobName + "."
+ LOG.info("Found an incomplete job directory " + fileName + "."
+ " Deleting it!!");
fs.delete(status.getPath(), true);
}
- } else {
- LOG.info("Deleting " + status.getPath());
- fs.delete(status.getPath(), true);
}
}
@@ -918,8 +924,7 @@
// Set the start/launch time only if there are recovered tasks
// Increment the job's restart count
jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME),
- job.getLong(JobHistory.Keys.LAUNCH_TIME),
- job.getInt(Keys.RESTART_COUNT) + 1);
+ job.getLong(JobHistory.Keys.LAUNCH_TIME));
// Save the new job status
JobStatus newStatus = (JobStatus)jip.getStatus().clone();
@@ -1119,7 +1124,84 @@
expireLaunchingTasks.removeTask(attemptId);
}
+ Path getRestartCountFile() {
+ return new Path(getSystemDir(), "jobtracker.info");
+ }
+
+ Path getTempRestartCountFile() {
+ return new Path(getSystemDir(), "jobtracker.info.recover");
+ }
+
+ /**
+ * Initialize the recovery process. It simply creates a jobtracker.info file
+ * in the jobtracker's system directory and writes its restart count in it.
+ * For the first start, the jobtracker writes '0' in it. Upon subsequent
+ * restarts the jobtracker replaces the count with its current count which
+ * is (old count + 1). The whole purpose of this api is to obtain restart
+ * counts across restarts to avoid attempt-id clashes.
+ *
+ * Note that in between if the jobtracker.info files goes missing then the
+ * jobtracker will disable recovery and continue.
+ *
+ */
+ void updateRestartCount() throws IOException {
+ Path restartFile = getRestartCountFile();
+ Path tmpRestartFile = getTempRestartCountFile();
+ FileSystem fs = restartFile.getFileSystem(conf);
+ FsPermission filePerm = new FsPermission(SYSTEM_FILE_PERMISSION);
+
+ // read the count from the jobtracker info file
+ if (fs.exists(restartFile)) {
+ fs.delete(tmpRestartFile, false); // delete the tmp file
+ } else if (fs.exists(tmpRestartFile)) {
+ // if .rec exists then delete the main file and rename the .rec to main
+ fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
+ } else {
+ // For the very first time the jobtracker will create a jobtracker.info
+ // file. If the jobtracker has restarted then disable recovery as files'
+ // needed for recovery are missing.
+
+ // disable recovery if this is a restart
+ shouldRecover = false;
+
+ // write the jobtracker.info file
+ FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
+ out.writeInt(0);
+ out.close();
+ return;
+ }
+
+ FSDataInputStream in = fs.open(restartFile);
+ // read the old count
+ restartCount = in.readInt();
+ ++restartCount; // increment the restart count
+ in.close();
+
+ // Write back the new restart count and rename the old info file
+ //TODO This is similar to jobhistory recovery, maybe this common code
+ // can be factored out.
+
+ // write to the tmp file
+ FSDataOutputStream out = FileSystem.create(fs, tmpRestartFile, filePerm);
+ out.writeInt(restartCount);
+ out.close();
+
+ // delete the main file
+ fs.delete(restartFile, false);
+
+ // rename the .rec to main file
+ fs.rename(tmpRestartFile, restartFile);
+ }
+
public void recover() {
+ if (!shouldRecover()) {
+ // clean up jobs structure
+ jobsToRecover.clear();
+ return;
+ }
+
+ LOG.info("Restart count of the jobtracker : " + restartCount);
+
// I. Init the jobs and cache the recovered job history filenames
Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
Iterator<JobID> idIter = jobsToRecover.iterator();
@@ -1128,7 +1210,8 @@
LOG.info("Trying to recover details of job " + id);
try {
// 1. Create the job object
- JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
+ JobInProgress job =
+ new JobInProgress(id, JobTracker.this, conf, restartCount);
// 2. Check if the user has appropriate access
// Get the user group info for the job's owner
@@ -1209,8 +1292,6 @@
// 3. Close the listener
listener.close();
- LOG.info("Restart count for job " + id + " is " + pJob.numRestarts());
-
// 4. Update the recovery metric
totalEventsRecovered += listener.getNumEventsRecovered();
@@ -1529,7 +1610,6 @@
}
// Make sure that the backup data is preserved
FileStatus[] systemDirData = fs.listStatus(this.systemDir);
- LOG.info("Cleaning up the system directory");
// Check if the history is enabled .. as we cant have persistence with
// history disabled
if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
@@ -1550,6 +1630,7 @@
break; // if there is something to recover else clean the sys dir
}
}
+ LOG.info("Cleaning up the system directory");
fs.delete(systemDir, true);
if (FileSystem.mkdirs(fs, systemDir,
new FsPermission(SYSTEM_DIR_PERMISSION))) {
@@ -1566,6 +1647,24 @@
}
Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
}
+
+ // Prepare for recovery. This is done irrespective of the status of restart
+ // flag.
+ try {
+ recoveryManager.updateRestartCount();
+ } catch (IOException ioe) {
+ LOG.warn("Failed to initialize recovery manager. The Recovery manager "
+ + "failed to access the system files in the system dir ("
+ + getSystemDir() + ").");
+ LOG.warn("It might be because the JobTracker failed to read/write system"
+ + " files (" + recoveryManager.getRestartCountFile() + " / "
+ + recoveryManager.getTempRestartCountFile() + ") or the system "
+ + " file " + recoveryManager.getRestartCountFile()
+ + " is missing!");
+ LOG.warn("Bailing out...");
+ throw ioe;
+ }
+
// Same with 'localDir' except it's always on the local disk.
jobConf.deleteLocalFiles(SUBDIR);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Wed Apr 8 12:47:44 2009
@@ -887,7 +887,7 @@
TaskAttemptID taskid = null;
if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
// Make sure that the attempts are unqiue across restarts
- int attemptId = job.numRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
+ int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
taskid = new TaskAttemptID( id, attemptId);
++nextTaskId;
} else {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Wed Apr 8 12:47:44 2009
@@ -66,6 +66,29 @@
return NameNode.createNameNode(args, config);
}
+ /**
+ * Start the data-node.
+ */
+ public DataNode startDataNode(int index, Configuration config)
+ throws IOException {
+ String dataDir = System.getProperty("test.build.data");
+ File dataNodeDir = new File(dataDir, "data-" + index);
+ config.set("dfs.data.dir", dataNodeDir.getPath());
+
+ String[] args = new String[] {};
+ // NameNode will modify config with the ports it bound to
+ return DataNode.createDataNode(args, config);
+ }
+
+ /**
+ * Stop the datanode.
+ */
+ public void stopDataNode(DataNode dn) {
+ if (dn != null) {
+ dn.shutdown();
+ }
+ }
+
public void stopNameNode(NameNode nn) {
if (nn != null) {
nn.stop();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Wed Apr 8 12:47:44 2009
@@ -85,6 +85,7 @@
assertTrue(dfs.exists(filePath));
// This will test TPC to a JobTracker
+ fs = FileSystem.get(sconf);
mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
final int jobTrackerPort = mr.getJobTrackerPort();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Wed Apr 8 12:47:44 2009
@@ -543,24 +543,28 @@
jobTrackerThread = new Thread(jobTracker);
jobTrackerThread.start();
- while (!jobTracker.isUp()) {
+ while (jobTracker.isActive() && !jobTracker.isUp()) {
try { // let daemons get started
Thread.sleep(1000);
} catch(InterruptedException e) {
}
}
- ClusterStatus status = jobTracker.getJobTracker().getClusterStatus(false);
- while (jobTracker.isActive() && status.getJobTrackerState() == JobTracker.State.INITIALIZING) {
- try {
- LOG.info("JobTracker still initializing. Waiting.");
- Thread.sleep(1000);
- } catch(InterruptedException e) {}
+ // is the jobtracker has started then wait for it to init
+ ClusterStatus status = null;
+ if (jobTracker.isUp()) {
status = jobTracker.getJobTracker().getClusterStatus(false);
+ while (jobTracker.isActive() && status.getJobTrackerState()
+ == JobTracker.State.INITIALIZING) {
+ try {
+ LOG.info("JobTracker still initializing. Waiting.");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {}
+ status = jobTracker.getJobTracker().getClusterStatus(false);
+ }
}
- if (!jobTracker.isActive()
- || status.getJobTrackerState() != JobTracker.State.RUNNING) {
+ if (!jobTracker.isActive()) {
// return if jobtracker has crashed
return;
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Wed Apr 8 12:47:44 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.hdfs.TestHDFSServerPorts;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.fs.FileSystem;
@@ -112,8 +113,10 @@
*/
public void testJobTrackerPorts() throws Exception {
NameNode nn = null;
+ DataNode dn = null;
try {
nn = hdfs.startNameNode();
+ dn = hdfs.startDataNode(1, hdfs.getConfig());
// start job tracker on the same port as name-node
JobConf conf2 = new JobConf(hdfs.getConfig());
@@ -139,6 +142,7 @@
assertTrue(started); // should start now
} finally {
+ hdfs.stopDataNode(dn);
hdfs.stopNameNode(nn);
}
}
@@ -148,10 +152,12 @@
*/
public void testTaskTrackerPorts() throws Exception {
NameNode nn = null;
+ DataNode dn = null;
JobTracker jt = null;
JTRunner runner = null;
try {
nn = hdfs.startNameNode();
+ dn = hdfs.startDataNode(2, hdfs.getConfig());
JobConf conf2 = new JobConf(hdfs.getConfig());
runner = new JTRunner();
@@ -187,6 +193,7 @@
runner.interrupt();
runner.join();
}
+ hdfs.stopDataNode(dn);
hdfs.stopNameNode(nn);
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Wed Apr 8 12:47:44 2009
@@ -239,7 +239,7 @@
Path sysDir = new Path(jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
int size = fs.listStatus(sysDir).length;
- while (size > 0) {
+ while (size > 1) { // ignore the jobtracker.info file
System.out.println("Waiting for the job files in sys directory to be cleaned up");
UtilsForTests.waitFor(100);
size = fs.listStatus(sysDir).length;
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Wed Apr 8 12:47:44 2009
@@ -29,6 +29,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
+import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -261,4 +263,139 @@
mr.shutdown();
}
+
+ /**
+ * Test if restart count of the jobtracker is correctly managed.
+ * Steps are as follows :
+ * - start the jobtracker and check if the info file gets created.
+ * - stops the jobtracker, deletes the jobtracker.info file and checks if
+ * upon restart the recovery is 'off'
+ * - submit a job to the jobtracker.
+ * - restart the jobtracker k times and check if the restart count on ith
+ * iteration is i.
+ * - submit a new job and check if its restart count is 0.
+ * - garble the jobtracker.info file and restart he jobtracker, the
+ * jobtracker should crash.
+ */
+ public void testRestartCount() throws Exception {
+ LOG.info("Testing restart-count");
+ String signalFile = new Path(TEST_DIR, "signal").toString();
+
+ // clean up
+ FileSystem fs = FileSystem.get(new Configuration());
+ fs.delete(TEST_DIR, true);
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.jobtracker.job.history.block.size", "1024");
+ conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+ conf.setBoolean("mapred.jobtracker.restart.recover", true);
+ // since there is no need for initing
+ conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
+ TaskScheduler.class);
+
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+ JobClient jc = new JobClient(mr.createJobConf());
+
+ // check if the jobtracker info file exists
+ Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
+ assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
+
+ // check if garbling the system files disables the recovery process
+ LOG.info("Stopping jobtracker for testing with system files deleted");
+ mr.stopJobTracker();
+
+ // delete the info file
+ Path rFile = jobtracker.recoveryManager.getRestartCountFile();
+ fs.delete(rFile,false);
+
+ // start the jobtracker
+ LOG.info("Stopping jobtracker with system files deleted");
+ mr.startJobTracker();
+
+ UtilsForTests.waitForJobTracker(jc);
+ jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+ // check if the recovey is disabled
+ assertFalse("Recovery is not disabled upon missing system files",
+ jobtracker.recoveryManager.shouldRecover());
+
+ // check if the system dir is sane
+ assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
+ Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
+ assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
+
+ // submit a job
+ JobConf job = mr.createJobConf();
+
+ UtilsForTests.configureWaitingJobConf(job,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0,
+ "test-recovery-manager", signalFile, signalFile);
+
+ // submit the faulty job
+ RunningJob rJob = jc.submitJob(job);
+ LOG.info("Submitted first job " + rJob.getID());
+
+ // kill the jobtracker multiple times and check if the count is correct
+ for (int i = 1; i <= 5; ++i) {
+ LOG.info("Stopping jobtracker for " + i + " time");
+ mr.stopJobTracker();
+
+ // start the jobtracker
+ LOG.info("Starting jobtracker for " + i + " time");
+ mr.startJobTracker();
+
+ UtilsForTests.waitForJobTracker(jc);
+
+ // check if the system dir is sane
+ assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
+ assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
+
+ jobtracker = mr.getJobTrackerRunner().getJobTracker();
+ JobInProgress jip = jobtracker.getJob(rJob.getID());
+
+ // assert if restart count is correct
+ assertEquals("Recovery manager failed to recover restart count",
+ i, jip.getNumRestarts());
+ }
+
+ // kill the old job
+ rJob.killJob();
+
+ // II. Submit a new job and check if the restart count is 0
+ JobConf job1 = mr.createJobConf();
+
+ UtilsForTests.configureWaitingJobConf(job1,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0,
+ "test-recovery-manager", signalFile, signalFile);
+
+ // make sure that the job id's dont clash
+ jobtracker.getNewJobId();
+
+ // submit a new job
+ rJob = jc.submitJob(job1);
+ LOG.info("Submitted first job after restart" + rJob.getID());
+
+ // assert if restart count is correct
+ JobInProgress jip = jobtracker.getJob(rJob.getID());
+ assertEquals("Restart count for new job is incorrect",
+ 0, jip.getNumRestarts());
+
+ LOG.info("Stopping jobtracker for testing the fs errors");
+ mr.stopJobTracker();
+
+ // check if system.dir problems in recovery kills the jobtracker
+ fs.delete(rFile, false);
+ FSDataOutputStream out = fs.create(rFile);
+ out.writeBoolean(true);
+ out.close();
+
+ // start the jobtracker
+ LOG.info("Starting jobtracker with fs errors");
+ mr.startJobTracker();
+ JobTrackerRunner runner = mr.getJobTrackerRunner();
+ assertFalse("Restart count for new job is incorrect", runner.isActive());
+
+ mr.shutdown();
+ }
}