You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/03/02 15:36:31 UTC
svn commit: r749318 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobTracker.java
src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Author: yhemanth
Date: Mon Mar 2 14:36:30 2009
New Revision: 749318
URL: http://svn.apache.org/viewvc?rev=749318&view=rev
Log:
HADOOP-4638. Fixes job recovery to not crash the job tracker for problems with a single job file. Contributed by Amar Kamat.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=749318&r1=749317&r2=749318&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Mar 2 14:36:30 2009
@@ -933,6 +933,9 @@
HADOOP-5146. Fixes a race condition that causes LocalDirAllocator to miss
files. (Devaraj Das via yhemanth)
+ HADOOP-4638. Fixes job recovery to not crash the job tracker for problems
+ with a single job file. (Amar Kamat via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=749318&r1=749317&r2=749318&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Mar 2 14:36:30 2009
@@ -805,7 +805,14 @@
if (Values.PREP.name().equals(jobStatus)) {
hasUpdates = true;
LOG.info("Calling init from RM for job " + jip.getJobID().toString());
- jip.initTasks();
+ try {
+ jip.initTasks();
+ } catch (IOException ioe) {
+ LOG.error("Job initialization failed : \n"
+ + StringUtils.stringifyException(ioe));
+ jip.fail(); // fail the job
+ throw ioe;
+ }
}
}
@@ -1080,22 +1087,35 @@
public void recover() throws IOException {
// I. Init the jobs and cache the recovered job history filenames
Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
- for (JobID id : jobsToRecover) {
+ Iterator<JobID> idIter = jobsToRecover.iterator();
+ while (idIter.hasNext()) {
+ JobID id = idIter.next();
+ LOG.info("Trying to recover job " + id);
// 1. Create the job object
JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
- // 2. Get the log file and the file path
- String logFileName =
- JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
- Path jobHistoryFilePath =
- JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
- // 3. Recover the history file. This involved
- // - deleting file.recover if file exists
- // - renaming file.recover to file if file doesnt exist
- // This makes sure that the (master) file exists
- JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
- jobHistoryFilePath);
+ String logFileName;
+ Path jobHistoryFilePath;
+ try {
+ // 2. Get the log file and the file path
+ logFileName =
+ JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
+ jobHistoryFilePath =
+ JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+
+ // 3. Recover the history file. This involved
+ // - deleting file.recover if file exists
+ // - renaming file.recover to file if file doesnt exist
+ // This makes sure that the (master) file exists
+ JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
+ jobHistoryFilePath);
+ } catch (IOException ioe) {
+ LOG.warn("Failed to recover job " + id + " history filename."
+ + " Ignoring.", ioe);
+ // TODO : remove job details from the system directory
+ idIter.remove();
+ continue;
+ }
// 4. Cache the history file name as it costs one dfs access
jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
@@ -1124,13 +1144,14 @@
JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
listener, fs);
} catch (IOException e) {
- LOG.info("JobTracker failed to recover job " + pJob + "."
- + " Ignoring it.", e);
- continue;
+ LOG.info("JobTracker failed to recover job " + pJob.getJobID() + "."
+ + " Ignoring it.", e);
}
// 3. Close the listener
listener.close();
+
+ LOG.info("Restart count for job " + id + " is " + pJob.numRestarts());
// 4. Update the recovery metric
totalEventsRecovered += listener.getNumEventsRecovered();
@@ -1138,9 +1159,14 @@
// 5. Cleanup history
// Delete the master log file as an indication that the new file
// should be used in future
- synchronized (pJob) {
- JobHistory.JobInfo.checkpointRecovery(logFileName,
- pJob.getJobConf());
+ try {
+ synchronized (pJob) {
+ JobHistory.JobInfo.checkpointRecovery(logFileName,
+ pJob.getJobConf());
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to delete log file (" + logFileName + ") for job "
+ + id + ". Ignoring it.", ioe);
}
// 6. Inform the jobtracker as to how much of the data is recovered.
@@ -2733,8 +2759,7 @@
* adding a job. This is the core job submission logic
* @param jobId The id for the job submitted which needs to be added
*/
- private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
- throws IOException {
+ private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
totalSubmissions++;
synchronized (jobs) {
@@ -3152,6 +3177,11 @@
JobInProgress job = getJob(taskId.getJobID());
if (job == null) {
+ // if job is not there in the cleanup list ... add it
+ synchronized (trackerToJobsToCleanup) {
+ Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
+ jobs.add(taskId.getJobID());
+ }
continue;
}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=749318&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Mon Mar 2 14:36:30 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+
+/**
+ * Test whether the {@link RecoveryManager} is able to tolerate job-recovery
+ * failures and the jobtracker is able to tolerate {@link RecoveryManager}
+ * failure.
+ */
+public class TestRecoveryManager extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestRecoveryManager.class);
+ private static final Path TEST_DIR =
+ new Path(System.getProperty("test.build.data", "/tmp"),
+ "test-recovery-manager");
+
+ /**
+ * Tests the {@link JobTracker} against the exceptions thrown in
+ * {@link JobTracker.RecoveryManager}. It does the following :
+ * - submits a job
+ * - kills the jobtracker
+ * - restarts the jobtracker with max-tasks-per-job < total tasks in the job
+ * - checks if the jobtraker starts normally
+ */
+ public void testJobTracker() throws Exception {
+ LOG.info("Testing jobtracker restart with faulty job");
+ String signalFile = new Path(TEST_DIR, "signal").toString();
+ JobConf conf = new JobConf();
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ fs.delete(TEST_DIR, true); // cleanup
+
+ conf.set("mapred.jobtracker.job.history.block.size", "1024");
+ conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+
+ JobConf job = mr.createJobConf();
+
+ UtilsForTests.configureWaitingJobConf(job,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output"), 20, 0,
+ "test-recovery-manager", signalFile, signalFile);
+
+ // submit the faulty job
+ RunningJob rJob = (new JobClient(job)).submitJob(job);
+ LOG.info("Submitted job " + rJob.getID());
+
+ while (rJob.mapProgress() < 0.5f) {
+ LOG.info("Waiting for job " + rJob.getID() + " to be 50% done");
+ UtilsForTests.waitFor(100);
+ }
+
+ // kill the jobtracker
+ LOG.info("Stopping jobtracker");
+ mr.stopJobTracker();
+
+ // make sure that the jobtracker is in recovery mode
+ mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
+ true);
+ mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 10);
+
+ // start the jobtracker
+ LOG.info("Starting jobtracker");
+ mr.startJobTracker();
+
+ mr.shutdown();
+ }
+
+ /**
+ * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown
+ * during recovery. It does the following :
+ * - submits a job with HIGH priority and x tasks
+ * - allows it to complete 50%
+ * - submits another job with normal priority and y tasks
+ * - kills the jobtracker
+ * - restarts the jobtracker with max-tasks-per-job such that
+ * y < max-tasks-per-job < x
+ * - checks if the jobtraker starts normally and job#2 is recovered while
+ * job#1 is failed.
+ */
+ public void testRecoveryManager() throws Exception {
+ LOG.info("Testing recovery-manager");
+ String signalFile = new Path(TEST_DIR, "signal").toString();
+
+ // clean up
+ FileSystem fs = FileSystem.get(new Configuration());
+ fs.delete(TEST_DIR, true);
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.jobtracker.job.history.block.size", "1024");
+ conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+
+ MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+ JobConf job1 = mr.createJobConf();
+ // set the high priority
+ job1.setJobPriority(JobPriority.HIGH);
+
+ UtilsForTests.configureWaitingJobConf(job1,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output"), 30, 0,
+ "test-recovery-manager", signalFile, signalFile);
+
+ // submit the faulty job
+ JobClient jc = new JobClient(job1);
+ RunningJob rJob1 = jc.submitJob(job1);
+ LOG.info("Submitted first job " + rJob1.getID());
+
+ while (rJob1.mapProgress() < 0.5f) {
+ LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+ UtilsForTests.waitFor(100);
+ }
+
+ // now submit job2
+ JobConf job2 = mr.createJobConf();
+
+ String signalFile1 = new Path(TEST_DIR, "signal1").toString();
+ UtilsForTests.configureWaitingJobConf(job2,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 20, 0,
+ "test-recovery-manager", signalFile1, signalFile1);
+
+ // submit the job
+ RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
+ LOG.info("Submitted job " + rJob2.getID());
+
+ // wait for it to init
+ JobInProgress jip = jobtracker.getJob(rJob2.getID());
+
+ while (!jip.inited()) {
+ LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
+ UtilsForTests.waitFor(100);
+ }
+
+ // kill the jobtracker
+ LOG.info("Stopping jobtracker");
+ mr.stopJobTracker();
+
+ // make sure that the jobtracker is in recovery mode
+ mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
+ true);
+ mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
+
+ // start the jobtracker
+ LOG.info("Starting jobtracker");
+ mr.startJobTracker();
+
+ jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+ // assert that job2 is recovered by the jobtracker as job1 would fail
+ assertEquals("Recovery manager failed to tolerate job failures",
+ 2, jobtracker.getAllJobs().length);
+
+ mr.shutdown();
+ }
+}
\ No newline at end of file