You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/03/02 15:36:31 UTC

svn commit: r749318 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobTracker.java src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Author: yhemanth
Date: Mon Mar  2 14:36:30 2009
New Revision: 749318

URL: http://svn.apache.org/viewvc?rev=749318&view=rev
Log:
HADOOP-4638. Fixes job recovery to not crash the job tracker for problems with a single job file. Contributed by Amar Kamat.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=749318&r1=749317&r2=749318&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Mar  2 14:36:30 2009
@@ -933,6 +933,9 @@
     HADOOP-5146. Fixes a race condition that causes LocalDirAllocator to miss
     files.  (Devaraj Das via yhemanth)
 
+    HADOOP-4638. Fixes job recovery to not crash the job tracker for problems
+    with a single job file. (Amar Kamat via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=749318&r1=749317&r2=749318&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Mar  2 14:36:30 2009
@@ -805,7 +805,14 @@
         if (Values.PREP.name().equals(jobStatus)) {
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
-          jip.initTasks();
+          try {
+            jip.initTasks();
+          } catch (IOException ioe) {
+            LOG.error("Job initialization failed : \n" 
+                      + StringUtils.stringifyException(ioe));
+            jip.fail(); // fail the job
+            throw ioe;
+          }
         }
       }
       
@@ -1080,22 +1087,35 @@
     public void recover() throws IOException {
       // I. Init the jobs and cache the recovered job history filenames
       Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
-      for (JobID id : jobsToRecover) {
+      Iterator<JobID> idIter = jobsToRecover.iterator();
+      while (idIter.hasNext()) {
+        JobID id = idIter.next();
+        LOG.info("Trying to recover job " + id);
         // 1. Create the job object
         JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
         
-        // 2. Get the log file and the file path
-        String logFileName = 
-          JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-        Path jobHistoryFilePath = 
-          JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-        
-        // 3. Recover the history file. This involved
-        //     - deleting file.recover if file exists
-        //     - renaming file.recover to file if file doesnt exist
-        // This makes sure that the (master) file exists
-        JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
-                                                 jobHistoryFilePath);
+        String logFileName;
+        Path jobHistoryFilePath;
+        try {
+          // 2. Get the log file and the file path
+          logFileName = 
+            JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
+          jobHistoryFilePath = 
+            JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
+
+          // 3. Recover the history file. This involved
+          //     - deleting file.recover if file exists
+          //     - renaming file.recover to file if file doesnt exist
+          // This makes sure that the (master) file exists
+          JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
+                                                   jobHistoryFilePath);
+        } catch (IOException ioe) {
+          LOG.warn("Failed to recover job " + id + " history filename." 
+                   + " Ignoring.", ioe);
+          // TODO : remove job details from the system directory
+          idIter.remove();
+          continue;
+        }
 
         // 4. Cache the history file name as it costs one dfs access
         jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
@@ -1124,13 +1144,14 @@
           JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
                                         listener, fs);
         } catch (IOException e) {
-          LOG.info("JobTracker failed to recover job " + pJob + "."
-                   + " Ignoring it.", e);
-          continue;
+          LOG.info("JobTracker failed to recover job " + pJob.getJobID() + "."
+                     + " Ignoring it.", e);
         }
 
         // 3. Close the listener
         listener.close();
+        
+        LOG.info("Restart count for job " + id + " is " + pJob.numRestarts());
 
         // 4. Update the recovery metric
         totalEventsRecovered += listener.getNumEventsRecovered();
@@ -1138,9 +1159,14 @@
         // 5. Cleanup history
         // Delete the master log file as an indication that the new file
         // should be used in future
-        synchronized (pJob) {
-          JobHistory.JobInfo.checkpointRecovery(logFileName, 
-              pJob.getJobConf());
+        try {
+          synchronized (pJob) {
+            JobHistory.JobInfo.checkpointRecovery(logFileName, 
+                                                  pJob.getJobConf());
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
+                   + id + ". Ignoring it.", ioe);
         }
 
         // 6. Inform the jobtracker as to how much of the data is recovered.
@@ -2733,8 +2759,7 @@
    * adding a job. This is the core job submission logic
    * @param jobId The id for the job submitted which needs to be added
    */
-  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) 
-  throws IOException {
+  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
     totalSubmissions++;
 
     synchronized (jobs) {
@@ -3152,6 +3177,11 @@
       
       JobInProgress job = getJob(taskId.getJobID());
       if (job == null) {
+        // if job is not there in the cleanup list ... add it
+        synchronized (trackerToJobsToCleanup) {
+          Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
+          jobs.add(taskId.getJobID());
+        }
         continue;
       }
       

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=749318&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Mon Mar  2 14:36:30 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+
+/**
+ * Test whether the {@link RecoveryManager} is able to tolerate job-recovery 
+ * failures and the jobtracker is able to tolerate {@link RecoveryManager}
+ * failure.
+ */
+public class TestRecoveryManager extends TestCase {
+  private static final Log LOG = 
+    LogFactory.getLog(TestRecoveryManager.class);
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data", "/tmp"), 
+             "test-recovery-manager");
+  
+  /**
+   * Tests the {@link JobTracker} against the exceptions thrown in 
+   * {@link JobTracker.RecoveryManager}. It does the following :
+   *  - submits a job
+   *  - kills the jobtracker
+   *  - restarts the jobtracker with max-tasks-per-job < total tasks in the job
+   *  - checks if the jobtraker starts normally
+   */
+  public void testJobTracker() throws Exception {
+    LOG.info("Testing jobtracker restart with faulty job");
+    String signalFile = new Path(TEST_DIR, "signal").toString();
+    JobConf conf = new JobConf();
+    
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true); // cleanup
+    
+    conf.set("mapred.jobtracker.job.history.block.size", "1024");
+    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+    
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+    
+    JobConf job = mr.createJobConf();
+    
+    UtilsForTests.configureWaitingJobConf(job, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output"), 20, 0, 
+        "test-recovery-manager", signalFile, signalFile);
+    
+    // submit the faulty job
+    RunningJob rJob = (new JobClient(job)).submitJob(job);
+    LOG.info("Submitted job " + rJob.getID());
+    
+    while (rJob.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob.getID() + " to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+    
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+    
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
+                                      true);
+    mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 10);
+    
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker();
+    
+    mr.shutdown();
+  }
+  
+  /**
+   * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown 
+   * during recovery. It does the following :
+   *  - submits a job with HIGH priority and x tasks
+   *  - allows it to complete 50%
+   *  - submits another job with normal priority and y tasks
+   *  - kills the jobtracker
+   *  - restarts the jobtracker with max-tasks-per-job such that 
+   *        y < max-tasks-per-job < x
+   *  - checks if the jobtraker starts normally and job#2 is recovered while 
+   *    job#1 is failed.
+   */
+  public void testRecoveryManager() throws Exception {
+    LOG.info("Testing recovery-manager");
+    String signalFile = new Path(TEST_DIR, "signal").toString();
+    
+    // clean up
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true);
+    
+    JobConf conf = new JobConf();
+    conf.set("mapred.jobtracker.job.history.block.size", "1024");
+    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+    
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    
+    JobConf job1 = mr.createJobConf();
+    //  set the high priority
+    job1.setJobPriority(JobPriority.HIGH);
+    
+    UtilsForTests.configureWaitingJobConf(job1, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output"), 30, 0, 
+        "test-recovery-manager", signalFile, signalFile);
+    
+    // submit the faulty job
+    JobClient jc = new JobClient(job1);
+    RunningJob rJob1 = jc.submitJob(job1);
+    LOG.info("Submitted first job " + rJob1.getID());
+    
+    while (rJob1.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+    
+    // now submit job2
+    JobConf job2 = mr.createJobConf();
+
+    String signalFile1 = new Path(TEST_DIR, "signal1").toString();
+    UtilsForTests.configureWaitingJobConf(job2, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 20, 0, 
+        "test-recovery-manager", signalFile1, signalFile1);
+    
+    // submit the job
+    RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
+    LOG.info("Submitted job " + rJob2.getID());
+    
+    // wait for it to init
+    JobInProgress jip = jobtracker.getJob(rJob2.getID());
+    
+    while (!jip.inited()) {
+      LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
+      UtilsForTests.waitFor(100);
+    }
+    
+    // kill the jobtracker
+    LOG.info("Stopping jobtracker");
+    mr.stopJobTracker();
+    
+    // make sure that the jobtracker is in recovery mode
+    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
+                                      true);
+    mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
+    
+    // start the jobtracker
+    LOG.info("Starting jobtracker");
+    mr.startJobTracker();
+    
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    
+    // assert that job2 is recovered by the jobtracker as job1 would fail
+    assertEquals("Recovery manager failed to tolerate job failures",
+                 2, jobtracker.getAllJobs().length);
+    
+    mr.shutdown();
+  }
+}
\ No newline at end of file