You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/08 14:47:45 UTC

svn commit: r763223 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/ipc/ src/test/org/apache/hadoop/mapred/

Author: sharad
Date: Wed Apr  8 12:47:44 2009
New Revision: 763223

URL: http://svn.apache.org/viewvc?rev=763223&view=rev
Log:
HADOOP-5394. JobTracker might schedule 2 attempts of the same task with the same attempt id across restarts. Contributed by Amar kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr  8 12:47:44 2009
@@ -1210,6 +1210,9 @@
     HADOOP-5585. Clear FileSystem statistics between tasks when jvm-reuse
     is enabled. (omalley)
 
+    HADOOP-5394. JobTracker might schedule 2 attempts of the same task 
+    with the same attempt id across restarts. (Amar Kamat via sharad)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed Apr  8 12:47:44 2009
@@ -122,7 +122,7 @@
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
-    TRACKER_NAME, STATE_STRING, VERSION, RESTART_COUNT
+    TRACKER_NAME, STATE_STRING, VERSION
   }
 
   /**
@@ -1167,9 +1167,15 @@
      * @param submitTime job's submit time
      * @param launchTime job's launch time
      * @param restartCount number of times the job got restarted
+     * @deprecated Use {@link #logJobInfo(JobID, long, long)} instead.
      */
     public static void logJobInfo(JobID jobid, long submitTime, long launchTime,
                                   int restartCount){
+      logJobInfo(jobid, submitTime, launchTime);
+    }
+
+    public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
+    {
       if (!disableHistory){
         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
@@ -1177,11 +1183,10 @@
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
                          new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, 
-                                     Keys.LAUNCH_TIME, Keys.RESTART_COUNT},
+                                     Keys.LAUNCH_TIME},
                          new String[] {jobid.toString(), 
                                        String.valueOf(submitTime), 
-                                       String.valueOf(launchTime),
-                                       String.valueOf(restartCount)});
+                                       String.valueOf(launchTime)});
         }
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed Apr  8 12:47:44 2009
@@ -164,7 +164,7 @@
   long finishTime;
   
   // Indicates how many times the job got restarted
-  private int restartCount = 0;
+  private final int restartCount;
 
   private JobConf conf;
   AtomicBoolean tasksInited = new AtomicBoolean(false);
@@ -214,6 +214,7 @@
     this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
     this.anyCacheLevel = this.maxLevel+1;
     this.jobtracker = null;
+    this.restartCount = 0;
   }
   
   /**
@@ -222,6 +223,12 @@
    */
   public JobInProgress(JobID jobid, JobTracker jobtracker, 
                        JobConf default_conf) throws IOException {
+    this(jobid, jobtracker, default_conf, 0);
+  }
+  
+  public JobInProgress(JobID jobid, JobTracker jobtracker, 
+                       JobConf default_conf, int rCount) throws IOException {
+    this.restartCount = rCount;
     this.jobId = jobid;
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
         + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
@@ -595,19 +602,17 @@
   }
 
   // Update the job start/launch time (upon restart) and log to history
-  synchronized void updateJobInfo(long startTime, long launchTime, int count) {
+  synchronized void updateJobInfo(long startTime, long launchTime) {
     // log and change to the job's start/launch time
     this.startTime = startTime;
     this.launchTime = launchTime;
-    // change to the job's restart count
-    this.restartCount = count;
-    JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime, count);
+    JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime);
   }
 
   /**
    * Get the number of times the job has restarted
    */
-  int numRestarts() {
+  int getNumRestarts() {
     return restartCount;
   }
   

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Apr  8 12:47:44 2009
@@ -46,6 +46,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -123,6 +125,10 @@
   final static FsPermission SYSTEM_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0733); // rwx-wx-wx
 
+  // system files should have 700 permission
+  final static FsPermission SYSTEM_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0700); // rwx------
+
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -672,6 +678,8 @@
     Set<JobID> jobsToRecover; // set of jobs to be recovered
     
     private int totalEventsRecovered = 0;
+    private int restartCount = 0;
+    private boolean shouldRecover = false;
 
     Set<String> recoveredTrackers = 
       Collections.synchronizedSet(new HashSet<String>());
@@ -850,7 +858,7 @@
     }
 
     public boolean shouldRecover() {
-      return jobsToRecover.size() != 0;
+      return shouldRecover;
     }
 
     public boolean shouldSchedule() {
@@ -888,18 +896,16 @@
     
     // checks if the job dir has the required files
     public void checkAndAddJob(FileStatus status) throws IOException {
-      String jobName = status.getPath().getName();
-      if (isJobNameValid(jobName)) {
+      String fileName = status.getPath().getName();
+      if (isJobNameValid(fileName)) {
         if (JobClient.isJobDirValid(status.getPath(), fs)) {
-          recoveryManager.addJobForRecovery(JobID.forName(jobName));
+          recoveryManager.addJobForRecovery(JobID.forName(fileName));
+          shouldRecover = true; // enable actual recovery if num-files > 1
         } else {
-          LOG.info("Found an incomplete job directory " + jobName + "." 
+          LOG.info("Found an incomplete job directory " + fileName + "." 
                    + " Deleting it!!");
           fs.delete(status.getPath(), true);
         }
-      } else {
-        LOG.info("Deleting " + status.getPath());
-        fs.delete(status.getPath(), true);
       }
     }
     
@@ -918,8 +924,7 @@
       // Set the start/launch time only if there are recovered tasks
       // Increment the job's restart count
       jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME), 
-                        job.getLong(JobHistory.Keys.LAUNCH_TIME),
-                        job.getInt(Keys.RESTART_COUNT) + 1);
+                        job.getLong(JobHistory.Keys.LAUNCH_TIME));
 
       // Save the new job status
       JobStatus newStatus = (JobStatus)jip.getStatus().clone();
@@ -1119,7 +1124,84 @@
      expireLaunchingTasks.removeTask(attemptId);
     }
   
+    Path getRestartCountFile() {
+      return new Path(getSystemDir(), "jobtracker.info");
+    }
+
+    Path getTempRestartCountFile() {
+      return new Path(getSystemDir(), "jobtracker.info.recover");
+    }
+
+    /**
+     * Initialize the recovery process. It simply creates a jobtracker.info file
+     * in the jobtracker's system directory and writes its restart count in it.
+     * For the first start, the jobtracker writes '0' in it. Upon subsequent 
+     * restarts the jobtracker replaces the count with its current count which 
+     * is (old count + 1). The whole purpose of this api is to obtain restart 
+     * counts across restarts to avoid attempt-id clashes.
+     * 
+     * Note that in between if the jobtracker.info files goes missing then the
+     * jobtracker will disable recovery and continue. 
+     *  
+     */
+    void updateRestartCount() throws IOException {
+      Path restartFile = getRestartCountFile();
+      Path tmpRestartFile = getTempRestartCountFile();
+      FileSystem fs = restartFile.getFileSystem(conf);
+      FsPermission filePerm = new FsPermission(SYSTEM_FILE_PERMISSION);
+
+      // read the count from the jobtracker info file
+      if (fs.exists(restartFile)) {
+        fs.delete(tmpRestartFile, false); // delete the tmp file
+      } else if (fs.exists(tmpRestartFile)) {
+        // if .rec exists then delete the main file and rename the .rec to main
+        fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
+      } else {
+        // For the very first time the jobtracker will create a jobtracker.info
+        // file. If the jobtracker has restarted then disable recovery as files'
+        // needed for recovery are missing.
+
+        // disable recovery if this is a restart
+        shouldRecover = false;
+
+        // write the jobtracker.info file
+        FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
+        out.writeInt(0);
+        out.close();
+        return;
+      }
+
+      FSDataInputStream in = fs.open(restartFile);
+      // read the old count
+      restartCount = in.readInt();
+      ++restartCount; // increment the restart count
+      in.close();
+
+      // Write back the new restart count and rename the old info file
+      //TODO This is similar to jobhistory recovery, maybe this common code
+      //      can be factored out.
+      
+      // write to the tmp file
+      FSDataOutputStream out = FileSystem.create(fs, tmpRestartFile, filePerm);
+      out.writeInt(restartCount);
+      out.close();
+
+      // delete the main file
+      fs.delete(restartFile, false);
+      
+      // rename the .rec to main file
+      fs.rename(tmpRestartFile, restartFile);
+    }
+
     public void recover() {
+      if (!shouldRecover()) {
+        // clean up jobs structure
+        jobsToRecover.clear();
+        return;
+      }
+
+      LOG.info("Restart count of the jobtracker : " + restartCount);
+
       // I. Init the jobs and cache the recovered job history filenames
       Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
       Iterator<JobID> idIter = jobsToRecover.iterator();
@@ -1128,7 +1210,8 @@
         LOG.info("Trying to recover details of job " + id);
         try {
           // 1. Create the job object
-          JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
+          JobInProgress job = 
+            new JobInProgress(id, JobTracker.this, conf, restartCount);
 
           // 2. Check if the user has appropriate access
           // Get the user group info for the job's owner
@@ -1209,8 +1292,6 @@
         // 3. Close the listener
         listener.close();
         
-        LOG.info("Restart count for job " + id + " is " + pJob.numRestarts());
-
         // 4. Update the recovery metric
         totalEventsRecovered += listener.getNumEventsRecovered();
 
@@ -1529,7 +1610,6 @@
         }
         // Make sure that the backup data is preserved
         FileStatus[] systemDirData = fs.listStatus(this.systemDir);
-        LOG.info("Cleaning up the system directory");
         // Check if the history is enabled .. as we cant have persistence with 
         // history disabled
         if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
@@ -1550,6 +1630,7 @@
             break; // if there is something to recover else clean the sys dir
           }
         }
+        LOG.info("Cleaning up the system directory");
         fs.delete(systemDir, true);
         if (FileSystem.mkdirs(fs, systemDir, 
             new FsPermission(SYSTEM_DIR_PERMISSION))) {
@@ -1566,6 +1647,24 @@
       }
       Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
     }
+
+    // Prepare for recovery. This is done irrespective of the status of restart
+    // flag.
+    try {
+      recoveryManager.updateRestartCount();
+    } catch (IOException ioe) {
+      LOG.warn("Failed to initialize recovery manager. The Recovery manager "
+               + "failed to access the system files in the system dir (" 
+               + getSystemDir() + ")."); 
+      LOG.warn("It might be because the JobTracker failed to read/write system"
+               + " files (" + recoveryManager.getRestartCountFile() + " / " 
+               + recoveryManager.getTempRestartCountFile() + ") or the system "
+               + " file " + recoveryManager.getRestartCountFile() 
+               + " is missing!");
+      LOG.warn("Bailing out...");
+      throw ioe;
+    }
+    
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Wed Apr  8 12:47:44 2009
@@ -887,7 +887,7 @@
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
       // Make sure that the attempts are unqiue across restarts
-      int attemptId = job.numRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
+      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
       taskid = new TaskAttemptID( id, attemptId);
       ++nextTaskId;
     } else {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java Wed Apr  8 12:47:44 2009
@@ -66,6 +66,29 @@
     return NameNode.createNameNode(args, config);
   }
 
+  /**
+   * Start the data-node.
+   */
+  public DataNode startDataNode(int index, Configuration config) 
+  throws IOException {
+    String dataDir = System.getProperty("test.build.data");
+    File dataNodeDir = new File(dataDir, "data-" + index);
+    config.set("dfs.data.dir", dataNodeDir.getPath());
+
+    String[] args = new String[] {};
+    // NameNode will modify config with the ports it bound to
+    return DataNode.createDataNode(args, config);
+  }
+
+  /**
+   * Stop the datanode.
+   */
+  public void stopDataNode(DataNode dn) {
+    if (dn != null) {
+      dn.shutdown();
+    }
+  }
+
   public void stopNameNode(NameNode nn) {
     if (nn != null) {
       nn.stop();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Wed Apr  8 12:47:44 2009
@@ -85,6 +85,7 @@
       assertTrue(dfs.exists(filePath));
 
       // This will test TPC to a JobTracker
+      fs = FileSystem.get(sconf);
       mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
       final int jobTrackerPort = mr.getJobTrackerPort();
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Wed Apr  8 12:47:44 2009
@@ -543,24 +543,28 @@
     jobTrackerThread = new Thread(jobTracker);
         
     jobTrackerThread.start();
-    while (!jobTracker.isUp()) {
+    while (jobTracker.isActive() && !jobTracker.isUp()) {
       try {                                     // let daemons get started
         Thread.sleep(1000);
       } catch(InterruptedException e) {
       }
     }
         
-    ClusterStatus status = jobTracker.getJobTracker().getClusterStatus(false);
-    while (jobTracker.isActive() && status.getJobTrackerState() == JobTracker.State.INITIALIZING) {
-      try {
-        LOG.info("JobTracker still initializing. Waiting.");
-        Thread.sleep(1000);
-      } catch(InterruptedException e) {}
+    // is the jobtracker has started then wait for it to init
+    ClusterStatus status = null;
+    if (jobTracker.isUp()) {
       status = jobTracker.getJobTracker().getClusterStatus(false);
+      while (jobTracker.isActive() && status.getJobTrackerState() 
+             == JobTracker.State.INITIALIZING) {
+        try {
+          LOG.info("JobTracker still initializing. Waiting.");
+          Thread.sleep(1000);
+        } catch(InterruptedException e) {}
+        status = jobTracker.getJobTracker().getClusterStatus(false);
+      }
     }
 
-    if (!jobTracker.isActive() 
-        || status.getJobTrackerState() != JobTracker.State.RUNNING) {
+    if (!jobTracker.isActive()) {
       // return if jobtracker has crashed
       return;
     }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java Wed Apr  8 12:47:44 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import junit.framework.TestCase;
 import org.apache.hadoop.hdfs.TestHDFSServerPorts;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -112,8 +113,10 @@
    */
   public void testJobTrackerPorts() throws Exception {
     NameNode nn = null;
+    DataNode dn = null;
     try {
       nn = hdfs.startNameNode();
+      dn = hdfs.startDataNode(1, hdfs.getConfig());
 
       // start job tracker on the same port as name-node
       JobConf conf2 = new JobConf(hdfs.getConfig());
@@ -139,6 +142,7 @@
       assertTrue(started); // should start now
 
     } finally {
+      hdfs.stopDataNode(dn);
       hdfs.stopNameNode(nn);
     }
   }
@@ -148,10 +152,12 @@
    */
   public void testTaskTrackerPorts() throws Exception {
     NameNode nn = null;
+    DataNode dn = null;
     JobTracker jt = null;
     JTRunner runner = null;
     try {
       nn = hdfs.startNameNode();
+      dn = hdfs.startDataNode(2, hdfs.getConfig());
 
       JobConf conf2 = new JobConf(hdfs.getConfig());
       runner = new JTRunner();
@@ -187,6 +193,7 @@
         runner.interrupt();
         runner.join();
       }
+      hdfs.stopDataNode(dn);
       hdfs.stopNameNode(nn);
     }
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Wed Apr  8 12:47:44 2009
@@ -239,7 +239,7 @@
         Path sysDir = new Path(jobtracker.getSystemDir());
         FileSystem fs = sysDir.getFileSystem(conf);
         int size = fs.listStatus(sysDir).length;
-        while (size > 0) {
+        while (size > 1) { // ignore the jobtracker.info file
           System.out.println("Waiting for the job files in sys directory to be cleaned up");
           UtilsForTests.waitFor(100);
           size = fs.listStatus(sysDir).length;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=763223&r1=763222&r2=763223&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Wed Apr  8 12:47:44 2009
@@ -29,6 +29,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
+import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
+import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -261,4 +263,139 @@
     
     mr.shutdown();
   }
+  
+  /**
+   * Test if restart count of the jobtracker is correctly managed.
+   * Steps are as follows :
+   *   - start the jobtracker and check if the info file gets created.
+   *   - stops the jobtracker, deletes the jobtracker.info file and checks if
+   *     upon restart the recovery is 'off'
+   *   - submit a job to the jobtracker.
+   *   - restart the jobtracker k times and check if the restart count on ith 
+   *     iteration is i.
+   *   - submit a new job and check if its restart count is 0.
+   *   - garble the jobtracker.info file and restart he jobtracker, the 
+   *     jobtracker should crash.
+   */
+  public void testRestartCount() throws Exception {
+    LOG.info("Testing restart-count");
+    String signalFile = new Path(TEST_DIR, "signal").toString();
+    
+    // clean up
+    FileSystem fs = FileSystem.get(new Configuration());
+    fs.delete(TEST_DIR, true);
+    
+    JobConf conf = new JobConf();
+    conf.set("mapred.jobtracker.job.history.block.size", "1024");
+    conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+    conf.setBoolean("mapred.jobtracker.restart.recover", true);
+    // since there is no need for initing
+    conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
+                  TaskScheduler.class);
+    
+    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    JobClient jc = new JobClient(mr.createJobConf());
+
+    // check if the jobtracker info file exists
+    Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
+    assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
+
+    // check if garbling the system files disables the recovery process
+    LOG.info("Stopping jobtracker for testing with system files deleted");
+    mr.stopJobTracker();
+    
+    // delete the info file
+    Path rFile = jobtracker.recoveryManager.getRestartCountFile();
+    fs.delete(rFile,false);
+    
+    // start the jobtracker
+    LOG.info("Stopping jobtracker with system files deleted");
+    mr.startJobTracker();
+    
+    UtilsForTests.waitForJobTracker(jc);
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    // check if the recovey is disabled
+    assertFalse("Recovery is not disabled upon missing system files", 
+                jobtracker.recoveryManager.shouldRecover());
+
+    // check if the system dir is sane
+    assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
+    Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
+    assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
+
+    // submit a job
+    JobConf job = mr.createJobConf();
+    
+    UtilsForTests.configureWaitingJobConf(job, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, 
+        "test-recovery-manager", signalFile, signalFile);
+    
+    // submit the faulty job
+    RunningJob rJob = jc.submitJob(job);
+    LOG.info("Submitted first job " + rJob.getID());
+
+    // kill the jobtracker multiple times and check if the count is correct
+    for (int i = 1; i <= 5; ++i) {
+      LOG.info("Stopping jobtracker for " + i + " time");
+      mr.stopJobTracker();
+      
+      // start the jobtracker
+      LOG.info("Starting jobtracker for " + i + " time");
+      mr.startJobTracker();
+      
+      UtilsForTests.waitForJobTracker(jc);
+      
+      // check if the system dir is sane
+      assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
+      assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
+      
+      jobtracker = mr.getJobTrackerRunner().getJobTracker();
+      JobInProgress jip = jobtracker.getJob(rJob.getID());
+      
+      // assert if restart count is correct
+      assertEquals("Recovery manager failed to recover restart count",
+                   i, jip.getNumRestarts());
+    }
+    
+    // kill the old job
+    rJob.killJob();
+
+    // II. Submit a new job and check if the restart count is 0
+    JobConf job1 = mr.createJobConf();
+    
+    UtilsForTests.configureWaitingJobConf(job1, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, 
+        "test-recovery-manager", signalFile, signalFile);
+    
+    // make sure that the job id's dont clash
+    jobtracker.getNewJobId();
+
+    // submit a new job
+    rJob = jc.submitJob(job1);
+    LOG.info("Submitted first job after restart" + rJob.getID());
+
+    // assert if restart count is correct
+    JobInProgress jip = jobtracker.getJob(rJob.getID());
+    assertEquals("Restart count for new job is incorrect",
+                 0, jip.getNumRestarts());
+
+    LOG.info("Stopping jobtracker for testing the fs errors");
+    mr.stopJobTracker();
+
+    // check if system.dir problems in recovery kills the jobtracker
+    fs.delete(rFile, false);
+    FSDataOutputStream out = fs.create(rFile);
+    out.writeBoolean(true);
+    out.close();
+
+    // start the jobtracker
+    LOG.info("Starting jobtracker with fs errors");
+    mr.startJobTracker();
+    JobTrackerRunner runner = mr.getJobTrackerRunner();
+    assertFalse("Restart count for new job is incorrect", runner.isActive());
+
+    mr.shutdown();
+  } 
 }