You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/03/10 12:46:27 UTC
svn commit: r752073 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobTracker.java
src/test/org/apache/hadoop/mapred/MiniMRCluster.java
src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Author: ddas
Date: Tue Mar 10 11:46:27 2009
New Revision: 752073
URL: http://svn.apache.org/viewvc?rev=752073&view=rev
Log:
HADOOP-5392. Fixes a problem to do with JT crashing during recovery when the job files are garbled. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Mar 10 11:46:27 2009
@@ -978,6 +978,9 @@
JobTracker also for deleting the paths on the job's output fs. (3) Moves the
references to completedJobStore outside the block where the JobTracker is locked.
(ddas)
+
+ HADOOP-5392. Fixes a problem to do with JT crashing during recovery when
+ the job files are garbled. (Amar Kamat vi ddas)
Release 0.19.1 - Unreleased
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Mar 10 11:46:27 2009
@@ -807,11 +807,11 @@
LOG.info("Calling init from RM for job " + jip.getJobID().toString());
try {
jip.initTasks();
- } catch (IOException ioe) {
+ } catch (Throwable t) {
LOG.error("Job initialization failed : \n"
- + StringUtils.stringifyException(ioe));
+ + StringUtils.stringifyException(t));
jip.fail(); // fail the job
- throw ioe;
+ throw new IOException(t);
}
}
}
@@ -1088,19 +1088,19 @@
expireLaunchingTasks.removeTask(attemptId);
}
- public void recover() throws IOException {
+ public void recover() {
// I. Init the jobs and cache the recovered job history filenames
Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
Iterator<JobID> idIter = jobsToRecover.iterator();
while (idIter.hasNext()) {
JobID id = idIter.next();
- LOG.info("Trying to recover job " + id);
- // 1. Create the job object
- JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
-
- String logFileName;
- Path jobHistoryFilePath;
+ LOG.info("Trying to recover details of job " + id);
try {
+ // 1. Create the job object
+ JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
+ String logFileName;
+ Path jobHistoryFilePath;
+
// 2. Get the log file and the file path
logFileName =
JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
@@ -1113,19 +1113,19 @@
// This makes sure that the (master) file exists
JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
jobHistoryFilePath);
- } catch (IOException ioe) {
- LOG.warn("Failed to recover job " + id + " history filename."
- + " Ignoring.", ioe);
+
+ // 4. Cache the history file name as it costs one dfs access
+ jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+
+ // 5. Sumbit the job to the jobtracker
+ addJob(id, job);
+ } catch (Throwable t) {
+ LOG.warn("Failed to recover job " + id + " history details."
+ + " Ignoring.", t);
// TODO : remove job details from the system directory
idIter.remove();
continue;
}
-
- // 4. Cache the history file name as it costs one dfs access
- jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
-
- // 5. Sumbit the job to the jobtracker
- addJob(id, job);
}
long recoveryStartTime = System.currentTimeMillis();
@@ -1141,7 +1141,14 @@
Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
String logFileName = jobHistoryFilePath.getName();
- FileSystem fs = jobHistoryFilePath.getFileSystem(conf);
+ FileSystem fs;
+ try {
+ fs = jobHistoryFilePath.getFileSystem(conf);
+ } catch (IOException ioe) {
+ LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
+ ioe);
+ continue;
+ }
// 2. Parse the history file
// Note that this also involves job update
@@ -1149,9 +1156,9 @@
try {
JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
listener, fs);
- } catch (IOException e) {
- LOG.info("JobTracker failed to recover job " + pJob.getJobID() + "."
- + " Ignoring it.", e);
+ } catch (Throwable t) {
+ LOG.info("JobTracker failed to recover job " + pJob.getJobID()
+ + " from history. Ignoring it.", t);
}
// 3. Close the listener
@@ -1170,9 +1177,9 @@
JobHistory.JobInfo.checkpointRecovery(logFileName,
pJob.getJobConf());
}
- } catch (IOException ioe) {
+ } catch (Throwable t) {
LOG.warn("Failed to delete log file (" + logFileName + ") for job "
- + id + ". Ignoring it.", ioe);
+ + id + ". Ignoring it.", t);
}
if (pJob.isComplete()) {
@@ -1482,7 +1489,12 @@
&& !JobHistory.isDisableHistory()
&& systemDirData != null) {
for (FileStatus status : systemDirData) {
- recoveryManager.checkAndAddJob(status);
+ try {
+ recoveryManager.checkAndAddJob(status);
+ } catch (Throwable t) {
+ LOG.warn("Failed to add the job " + status.getPath().getName(),
+ t);
+ }
}
// Check if there are jobs to be recovered
@@ -1598,7 +1610,11 @@
taskScheduler.start();
// Start the recovery after starting the scheduler
- recoveryManager.recover();
+ try {
+ recoveryManager.recover();
+ } catch (Throwable t) {
+ LOG.warn("Recovery manager crashed! Ignoring.", t);
+ }
this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
@@ -3178,6 +3194,10 @@
// if job is not there in the cleanup list ... add it
synchronized (trackerToJobsToCleanup) {
Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
+ if (jobs == null) {
+ jobs = new HashSet<JobID>();
+ trackerToJobsToCleanup.put(trackerName, jobs);
+ }
jobs.add(taskId.getJobID());
}
continue;
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Mar 10 11:46:27 2009
@@ -61,6 +61,7 @@
*/
class JobTrackerRunner implements Runnable {
private JobTracker tracker = null;
+ private volatile boolean isActive = true;
JobConf jc = null;
@@ -72,6 +73,10 @@
return (tracker != null);
}
+ public boolean isActive() {
+ return isActive;
+ }
+
public int getJobTrackerPort() {
return tracker.getTrackerPort();
}
@@ -97,6 +102,7 @@
tracker.offerService();
} catch (Throwable e) {
LOG.error("Job tracker crashed", e);
+ isActive = false;
}
}
@@ -111,6 +117,7 @@
} catch (Throwable e) {
LOG.error("Problem shutting down job tracker", e);
}
+ isActive = false;
}
}
@@ -552,6 +559,21 @@
}
}
+ ClusterStatus status = jobTracker.getJobTracker().getClusterStatus(false);
+ while (jobTracker.isActive() && status.getJobTrackerState() == JobTracker.State.INITIALIZING) {
+ try {
+ LOG.info("JobTracker still initializing. Waiting.");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {}
+ status = jobTracker.getJobTracker().getClusterStatus(false);
+ }
+
+ if (!jobTracker.isActive()
+ || status.getJobTrackerState() != JobTracker.State.RUNNING) {
+ // return if jobtracker has crashed
+ return;
+ }
+
// Set the configuration for the task-trackers
this.jobTrackerPort = jobTracker.getJobTrackerPort();
this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Tue Mar 10 11:46:27 2009
@@ -25,9 +25,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
/**
@@ -45,9 +45,11 @@
/**
* Tests the {@link JobTracker} against the exceptions thrown in
* {@link JobTracker.RecoveryManager}. It does the following :
- * - submits a job
+ * - submits 2 jobs
* - kills the jobtracker
- * - restarts the jobtracker with max-tasks-per-job < total tasks in the job
+ * - Garble job.xml for one job causing it to fail in constructor
+ * and job.split for another causing it to fail in init.
+ * - restarts the jobtracker
* - checks if the jobtraker starts normally
*/
public void testJobTracker() throws Exception {
@@ -63,33 +65,75 @@
MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
- JobConf job = mr.createJobConf();
+ JobConf job1 = mr.createJobConf();
+
+ UtilsForTests.configureWaitingJobConf(job1,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0,
+ "test-recovery-manager", signalFile, signalFile);
+
+ // submit the faulty job
+ RunningJob rJob1 = (new JobClient(job1)).submitJob(job1);
+ LOG.info("Submitted job " + rJob1.getID());
+
+ while (rJob1.mapProgress() < 0.5f) {
+ LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+ UtilsForTests.waitFor(100);
+ }
+
+ JobConf job2 = mr.createJobConf();
- UtilsForTests.configureWaitingJobConf(job,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output"), 20, 0,
+ UtilsForTests.configureWaitingJobConf(job2,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0,
"test-recovery-manager", signalFile, signalFile);
// submit the faulty job
- RunningJob rJob = (new JobClient(job)).submitJob(job);
- LOG.info("Submitted job " + rJob.getID());
+ RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
+ LOG.info("Submitted job " + rJob2.getID());
- while (rJob.mapProgress() < 0.5f) {
- LOG.info("Waiting for job " + rJob.getID() + " to be 50% done");
+ while (rJob2.mapProgress() < 0.5f) {
+ LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done");
UtilsForTests.waitFor(100);
}
// kill the jobtracker
LOG.info("Stopping jobtracker");
+ String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
mr.stopJobTracker();
+ // delete the job.xml of job #1 causing the job to fail in constructor
+ Path jobFile =
+ new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
+ LOG.info("Deleting job.xml file : " + jobFile.toString());
+ fs.delete(jobFile, false); // delete the job.xml file
+
+ // create the job.xml file with 0 bytes
+ FSDataOutputStream out = fs.create(jobFile);
+ out.write(1);
+ out.close();
+
+ // delete the job.split of job #2 causing the job to fail in initTasks
+ Path jobSplitFile =
+ new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
+ LOG.info("Deleting job.split file : " + jobSplitFile.toString());
+ fs.delete(jobSplitFile, false); // delete the job.split file
+
+ // create the job.split file with 0 bytes
+ out = fs.create(jobSplitFile);
+ out.write(1);
+ out.close();
+
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
- mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 10);
-
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker();
+ ClusterStatus status =
+ mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
+
+ // check if the jobtracker came up or not
+ assertEquals("JobTracker crashed!",
+ JobTracker.State.RUNNING, status.getJobTrackerState());
mr.shutdown();
}
@@ -180,4 +224,4 @@
mr.shutdown();
}
-}
\ No newline at end of file
+}