You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/03/10 12:46:27 UTC

svn commit: r752073 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobTracker.java src/test/org/apache/hadoop/mapred/MiniMRCluster.java src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Author: ddas
Date: Tue Mar 10 11:46:27 2009
New Revision: 752073

URL: http://svn.apache.org/viewvc?rev=752073&view=rev
Log:
HADOOP-5392. Fixes a problem to do with JT crashing during recovery when the job files are garbled. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Mar 10 11:46:27 2009
@@ -978,6 +978,9 @@
     JobTracker also for deleting the paths on the job's output fs. (3) Moves the
     references to completedJobStore outside the block where the JobTracker is locked.
     (ddas)
+
+    HADOOP-5392. Fixes a problem to do with JT crashing during recovery when
+    the job files are garbled. (Amar Kamat vi ddas)
  
 Release 0.19.1 - Unreleased
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Mar 10 11:46:27 2009
@@ -807,11 +807,11 @@
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           try {
             jip.initTasks();
-          } catch (IOException ioe) {
+          } catch (Throwable t) {
             LOG.error("Job initialization failed : \n" 
-                      + StringUtils.stringifyException(ioe));
+                      + StringUtils.stringifyException(t));
             jip.fail(); // fail the job
-            throw ioe;
+            throw new IOException(t);
           }
         }
       }
@@ -1088,19 +1088,19 @@
      expireLaunchingTasks.removeTask(attemptId);
     }
   
-    public void recover() throws IOException {
+    public void recover() {
       // I. Init the jobs and cache the recovered job history filenames
       Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
       Iterator<JobID> idIter = jobsToRecover.iterator();
       while (idIter.hasNext()) {
         JobID id = idIter.next();
-        LOG.info("Trying to recover job " + id);
-        // 1. Create the job object
-        JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
-        
-        String logFileName;
-        Path jobHistoryFilePath;
+        LOG.info("Trying to recover details of job " + id);
         try {
+          // 1. Create the job object
+          JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
+          String logFileName;
+          Path jobHistoryFilePath;
+
           // 2. Get the log file and the file path
           logFileName = 
             JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
@@ -1113,19 +1113,19 @@
           // This makes sure that the (master) file exists
           JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
                                                    jobHistoryFilePath);
-        } catch (IOException ioe) {
-          LOG.warn("Failed to recover job " + id + " history filename." 
-                   + " Ignoring.", ioe);
+          
+          // 4. Cache the history file name as it costs one dfs access
+          jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+
+          // 5. Sumbit the job to the jobtracker
+          addJob(id, job);
+        } catch (Throwable t) {
+          LOG.warn("Failed to recover job " + id + " history details." 
+                   + " Ignoring.", t);
           // TODO : remove job details from the system directory
           idIter.remove();
           continue;
         }
-
-        // 4. Cache the history file name as it costs one dfs access
-        jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
-
-        // 5. Sumbit the job to the jobtracker
-        addJob(id, job);
       }
 
       long recoveryStartTime = System.currentTimeMillis();
@@ -1141,7 +1141,14 @@
         Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
         String logFileName = jobHistoryFilePath.getName();
 
-        FileSystem fs = jobHistoryFilePath.getFileSystem(conf);
+        FileSystem fs;
+        try {
+          fs = jobHistoryFilePath.getFileSystem(conf);
+        } catch (IOException ioe) {
+          LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
+                   ioe);
+          continue;
+        }
 
         // 2. Parse the history file
         // Note that this also involves job update
@@ -1149,9 +1156,9 @@
         try {
           JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
                                         listener, fs);
-        } catch (IOException e) {
-          LOG.info("JobTracker failed to recover job " + pJob.getJobID() + "."
-                     + " Ignoring it.", e);
+        } catch (Throwable t) {
+          LOG.info("JobTracker failed to recover job " + pJob.getJobID() 
+                   + " from history. Ignoring it.", t);
         }
 
         // 3. Close the listener
@@ -1170,9 +1177,9 @@
             JobHistory.JobInfo.checkpointRecovery(logFileName, 
                                                   pJob.getJobConf());
           }
-        } catch (IOException ioe) {
+        } catch (Throwable t) {
           LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
-                   + id + ". Ignoring it.", ioe);
+                   + id + ". Ignoring it.", t);
         }
 
         if (pJob.isComplete()) {
@@ -1482,7 +1489,12 @@
             && !JobHistory.isDisableHistory()
             && systemDirData != null) {
           for (FileStatus status : systemDirData) {
-            recoveryManager.checkAndAddJob(status);
+            try {
+              recoveryManager.checkAndAddJob(status);
+            } catch (Throwable t) {
+              LOG.warn("Failed to add the job " + status.getPath().getName(), 
+                       t);
+            }
           }
           
           // Check if there are jobs to be recovered
@@ -1598,7 +1610,11 @@
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler
-    recoveryManager.recover();
+    try {
+      recoveryManager.recover();
+    } catch (Throwable t) {
+      LOG.warn("Recovery manager crashed! Ignoring.", t);
+    }
     
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
@@ -3178,6 +3194,10 @@
         // if job is not there in the cleanup list ... add it
         synchronized (trackerToJobsToCleanup) {
           Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
+          if (jobs == null) {
+            jobs = new HashSet<JobID>();
+            trackerToJobsToCleanup.put(trackerName, jobs);
+          }
           jobs.add(taskId.getJobID());
         }
         continue;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Mar 10 11:46:27 2009
@@ -61,6 +61,7 @@
    */
   class JobTrackerRunner implements Runnable {
     private JobTracker tracker = null;
+    private volatile boolean isActive = true;
     
     JobConf jc = null;
         
@@ -72,6 +73,10 @@
       return (tracker != null);
     }
         
+    public boolean isActive() {
+      return isActive;
+    }
+
     public int getJobTrackerPort() {
       return tracker.getTrackerPort();
     }
@@ -97,6 +102,7 @@
         tracker.offerService();
       } catch (Throwable e) {
         LOG.error("Job tracker crashed", e);
+        isActive = false;
       }
     }
         
@@ -111,6 +117,7 @@
       } catch (Throwable e) {
         LOG.error("Problem shutting down job tracker", e);
       }
+      isActive = false;
     }
   }
     
@@ -552,6 +559,21 @@
       }
     }
         
+    ClusterStatus status = jobTracker.getJobTracker().getClusterStatus(false);
+    while (jobTracker.isActive() && status.getJobTrackerState() == JobTracker.State.INITIALIZING) {
+      try {
+        LOG.info("JobTracker still initializing. Waiting.");
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {}
+      status = jobTracker.getJobTracker().getClusterStatus(false);
+    }
+
+    if (!jobTracker.isActive() 
+        || status.getJobTrackerState() != JobTracker.State.RUNNING) {
+      // return if jobtracker has crashed
+      return;
+    }
+ 
     // Set the configuration for the task-trackers
     this.jobTrackerPort = jobTracker.getJobTrackerPort();
     this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=752073&r1=752072&r2=752073&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Tue Mar 10 11:46:27 2009
@@ -25,9 +25,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
 
 /**
@@ -45,9 +45,11 @@
   /**
    * Tests the {@link JobTracker} against the exceptions thrown in 
    * {@link JobTracker.RecoveryManager}. It does the following :
-   *  - submits a job
+   *  - submits 2 jobs
    *  - kills the jobtracker
-   *  - restarts the jobtracker with max-tasks-per-job < total tasks in the job
+   *  - Garble job.xml for one job causing it to fail in constructor 
+   *    and job.split for another causing it to fail in init.
+   *  - restarts the jobtracker
    *  - checks if the jobtraker starts normally
    */
   public void testJobTracker() throws Exception {
@@ -63,33 +65,75 @@
     
     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
     
-    JobConf job = mr.createJobConf();
+    JobConf job1 = mr.createJobConf();
+    
+    UtilsForTests.configureWaitingJobConf(job1, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, 
+        "test-recovery-manager", signalFile, signalFile);
+    
+    // submit the faulty job
+    RunningJob rJob1 = (new JobClient(job1)).submitJob(job1);
+    LOG.info("Submitted job " + rJob1.getID());
+    
+    while (rJob1.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+    
+    JobConf job2 = mr.createJobConf();
     
-    UtilsForTests.configureWaitingJobConf(job, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output"), 20, 0, 
+    UtilsForTests.configureWaitingJobConf(job2, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, 
         "test-recovery-manager", signalFile, signalFile);
     
     // submit the faulty job
-    RunningJob rJob = (new JobClient(job)).submitJob(job);
-    LOG.info("Submitted job " + rJob.getID());
+    RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
+    LOG.info("Submitted job " + rJob2.getID());
     
-    while (rJob.mapProgress() < 0.5f) {
-      LOG.info("Waiting for job " + rJob.getID() + " to be 50% done");
+    while (rJob2.mapProgress() < 0.5f) {
+      LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done");
       UtilsForTests.waitFor(100);
     }
     
     // kill the jobtracker
     LOG.info("Stopping jobtracker");
+    String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
     mr.stopJobTracker();
     
+    // delete the job.xml of job #1 causing the job to fail in constructor
+    Path jobFile = 
+      new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
+    LOG.info("Deleting job.xml file : " + jobFile.toString());
+    fs.delete(jobFile, false); // delete the job.xml file
+    
+    // create the job.xml file with 0 bytes
+    FSDataOutputStream out = fs.create(jobFile);
+    out.write(1);
+    out.close();
+
+    // delete the job.split of job #2 causing the job to fail in initTasks
+    Path jobSplitFile = 
+      new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
+    LOG.info("Deleting job.split file : " + jobSplitFile.toString());
+    fs.delete(jobSplitFile, false); // delete the job.split file
+    
+    // create the job.split file with 0 bytes
+    out = fs.create(jobSplitFile);
+    out.write(1);
+    out.close();
+
     // make sure that the jobtracker is in recovery mode
     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
                                       true);
-    mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 10);
-    
     // start the jobtracker
     LOG.info("Starting jobtracker");
     mr.startJobTracker();
+    ClusterStatus status = 
+      mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
+    
+    // check if the jobtracker came up or not
+    assertEquals("JobTracker crashed!", 
+                 JobTracker.State.RUNNING, status.getJobTrackerState());
     
     mr.shutdown();
   }
@@ -180,4 +224,4 @@
     
     mr.shutdown();
   }
-}
\ No newline at end of file
+}