You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [18/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Nov 28 20:26:01 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -64,17 +65,20 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
-import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapred.JobHistory.Listener;
-import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -96,9 +100,8 @@
 import org.apache.hadoop.util.Service;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -107,12 +110,11 @@
  *******************************************************/
 public class JobTracker extends Service 
     implements MRConstants, InterTrackerProtocol,
-    JobSubmissionProtocol, TaskTrackerManager,
-    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
+    ClientProtocol, TaskTrackerManager,
+    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JTConfig {
 
   static{
-    Configuration.addDefaultResource("mapred-default.xml");
-    Configuration.addDefaultResource("mapred-site.xml");
+    ConfigUtil.loadResources();
   }
 
   private long tasktrackerExpiryInterval;
@@ -129,17 +131,26 @@
   // The maximum number of blacklists for a tracker after which the 
   // tracker could be blacklisted across all jobs
   private int MAX_BLACKLISTS_PER_TRACKER = 4;
+  
   // Approximate number of heartbeats that could arrive JobTracker
   // in a second
-  private int NUM_HEARTBEATS_IN_SECOND = 100;
+  private int NUM_HEARTBEATS_IN_SECOND;
+  private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
+  private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
+  
+  // Scaling factor for heartbeats, used for testing only
+  private float HEARTBEATS_SCALING_FACTOR;
+  private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
+  private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+  
   public static enum State { INITIALIZING, RUNNING }
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private DNSToSwitchMapping dnsToSwitchMapping;
-  private NetworkTopology clusterMap = new NetworkTopology();
+  NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
-  private TaskScheduler taskScheduler;
+  TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
@@ -155,6 +166,8 @@
   
   static final Clock DEFAULT_CLOCK = new Clock();
 
+  private JobHistory jobHistory;
+
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -181,6 +194,11 @@
   }
   
   /**
+   * Return the JT's job history handle.
+   * @return the jobhistory handle
+   */
+  JobHistory getJobHistory() { return jobHistory; }
+  /**
    * Start the JobTracker with given configuration.
    * 
    * The conf will be modified to reflect the actual ports on which 
@@ -245,8 +263,8 @@
                                  long clientVersion) throws IOException {
     if (protocol.equals(InterTrackerProtocol.class.getName())) {
       return InterTrackerProtocol.versionID;
-    } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
-      return JobSubmissionProtocol.versionID;
+    } else if (protocol.equals(ClientProtocol.class.getName())){
+      return ClientProtocol.versionID;
     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
       return RefreshAuthorizationPolicyProtocol.versionID;
     } else if (protocol.equals(AdminOperationsProtocol.class.getName())){
@@ -408,14 +426,7 @@
               if ((now - newProfile.getLastSeen()) >
                   tasktrackerExpiryInterval) {
                 // Remove completely after marking the tasks as 'KILLED'
-                lostTaskTracker(current);
-                // tracker is lost, and if it is blacklisted, remove 
-                // it from the count of blacklisted trackers in the cluster
-                if (isBlacklisted(trackerName)) {
-                  faultyTrackers.numBlacklistedTrackers -= 1;
-                }
-                updateTaskTrackerStatus(trackerName, null);
-                statistics.taskTrackerRemoved(trackerName);
+                removeTracker(current);
                 // remove the mapping from the hosts list
                 String hostname = newProfile.getHost();
                 hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -430,7 +441,20 @@
     }
   }
 
-  synchronized void retireJob(JobID jobid, String historyFile) {
+  private void removeTracker(TaskTracker tracker) {
+    lostTaskTracker(tracker);
+    String trackerName = tracker.getStatus().getTrackerName();
+    // tracker is lost, and if it is blacklisted, remove 
+    // it from the count of blacklisted trackers in the cluster
+    if (isBlacklisted(trackerName)) {
+      faultyTrackers.decrBlackListedTrackers(1);
+    }
+    updateTaskTrackerStatus(trackerName, null);
+    statistics.taskTrackerRemoved(trackerName);
+    getInstrumentation().decTrackers(1);
+  }
+
+  public synchronized void retireJob(JobID jobid, String historyFile) {
     synchronized (jobs) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
@@ -450,13 +474,13 @@
         }
         status.setTrackingUrl(trackingUrl);
         // clean up job files from the local disk
-        JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
+        job.cleanupLocalizedJobConf(job.getProfile().getJobID());
 
         //this configuration is primarily for testing
         //test cases can set this to false to validate job data structures on 
         //job completion
         boolean retireJob = 
-          conf.getBoolean("mapred.job.tracker.retire.jobs", true);
+          conf.getBoolean(JT_RETIREJOBS, true);
 
         if (retireJob) {
           //purge the job from memory
@@ -637,7 +661,16 @@
       }        
     }
 
-    
+    private void incrBlackListedTrackers(int count) {
+      numBlacklistedTrackers += count;
+      getInstrumentation().addBlackListedTrackers(count);
+    }
+
+    private void decrBlackListedTrackers(int count) {
+      numBlacklistedTrackers -= count;
+      getInstrumentation().decBlackListedTrackers(count);
+    }
+
     private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
       FaultInfo fi = getFaultInfo(hostName, true);
       boolean blackListed = fi.isBlacklisted();
@@ -796,7 +829,7 @@
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+        incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
       }
     }
     
@@ -816,7 +849,7 @@
         }
         uniqueHostsMap.put(hostName,
                            numTrackersOnHost);
-        numBlacklistedTrackers -= numTrackersOnHost;
+        decrBlackListedTrackers(numTrackersOnHost);
       }
     }
 
@@ -934,172 +967,11 @@
   // Used to recover the jobs upon restart
   ///////////////////////////////////////////////////////
   class RecoveryManager {
-    Set<JobID> jobsToRecover; // set of jobs to be recovered
-    
-    private int totalEventsRecovered = 0;
+    private Set<JobID> jobsToRecover; // set of jobs to be recovered
+    private int recovered;
     private int restartCount = 0;
     private boolean shouldRecover = false;
 
-    Set<String> recoveredTrackers = 
-      Collections.synchronizedSet(new HashSet<String>());
-    
-    /** A custom listener that replays the events in the order in which the 
-     * events (task attempts) occurred. 
-     */
-    class JobRecoveryListener implements Listener {
-      // The owner job
-      private JobInProgress jip;
-      
-      private JobHistory.JobInfo job; // current job's info object
-      
-      // Maintain the count of the (attempt) events recovered
-      private int numEventsRecovered = 0;
-      
-      // Maintains open transactions
-      private Map<String, String> hangingAttempts = 
-        new HashMap<String, String>();
-      
-      // Whether there are any updates for this job
-      private boolean hasUpdates = false;
-      
-      public JobRecoveryListener(JobInProgress jip) {
-        this.jip = jip;
-        this.job = new JobHistory.JobInfo(jip.getJobID().toString());
-      }
-
-      /**
-       * Process a task. Note that a task might commit a previously pending 
-       * transaction.
-       */
-      private void processTask(String taskId, JobHistory.Task task) {
-        // Any TASK info commits the previous transaction
-        boolean hasHanging = hangingAttempts.remove(taskId) != null;
-        if (hasHanging) {
-          numEventsRecovered += 2;
-        }
-        
-        TaskID id = TaskID.forName(taskId);
-        TaskInProgress tip = getTip(id);
-
-        updateTip(tip, task);
-      }
-
-      /**
-       * Adds a task-attempt in the listener
-       */
-      private void processTaskAttempt(String taskAttemptId, 
-                                      JobHistory.TaskAttempt attempt) {
-        TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
-        
-        // Check if the transaction for this attempt can be committed
-        String taskStatus = attempt.get(Keys.TASK_STATUS);
-
-        if (taskStatus.length() > 0) {
-          // This means this is an update event
-          if (taskStatus.equals(Values.SUCCESS.name())) {
-            // Mark this attempt as hanging
-            hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
-            addSuccessfulAttempt(jip, id, attempt);
-          } else {
-            addUnsuccessfulAttempt(jip, id, attempt);
-            numEventsRecovered += 2;
-          }
-        } else {
-          createTaskAttempt(jip, id, attempt);
-        }
-      }
-
-      public void handle(JobHistory.RecordTypes recType, Map<Keys, 
-                         String> values) throws IOException {
-        if (recType == JobHistory.RecordTypes.Job) {
-          // Update the meta-level job information
-          job.handle(values);
-          
-          // Forcefully init the job as we have some updates for it
-          checkAndInit();
-        } else if (recType.equals(JobHistory.RecordTypes.Task)) {
-          String taskId = values.get(Keys.TASKID);
-          
-          // Create a task
-          JobHistory.Task task = new JobHistory.Task();
-          task.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(task)) {
-            return;
-          }
-            
-          // Process the task i.e update the tip state
-          processTask(taskId, task);
-        } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the attempt state via job
-          processTaskAttempt(attemptId, attempt);
-        } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the job state via job
-          processTaskAttempt(attemptId, attempt);
-        }
-      }
-
-      // Check if the task is of type CLEANUP
-      private boolean isCleanup(JobHistory.Task task) {
-        String taskType = task.get(Keys.TASK_TYPE);
-        return Values.CLEANUP.name().equals(taskType);
-      }
-      
-      // Init the job if its ready for init. Also make sure that the scheduler
-      // is updated
-      private void checkAndInit() throws IOException {
-        String jobStatus = this.job.get(Keys.JOB_STATUS);
-        if (Values.PREP.name().equals(jobStatus)) {
-          hasUpdates = true;
-          LOG.info("Calling init from RM for job " + jip.getJobID().toString());
-          initJob(jip);
-          if (!jip.inited()) {
-            throw new IOException("Failed to initialize job " + jip.getJobID());
-          }
-        }
-      }
-      
-      void close() {
-        if (hasUpdates) {
-          // Apply the final (job-level) updates
-          JobStatusChangeEvent event = updateJob(jip, job);
-          
-          synchronized (JobTracker.this) {
-            // Update the job listeners
-            updateJobInProgressListeners(event);
-          }
-        }
-      }
-      
-      public int getNumEventsRecovered() {
-        return numEventsRecovered;
-      }
-
-    }
-    
     public RecoveryManager() {
       jobsToRecover = new TreeSet<JobID>();
     }
@@ -1108,6 +980,10 @@
       return jobsToRecover.contains(id);
     }
 
+    int getRecovered() {
+      return recovered;
+    }
+
     void addJobForRecovery(JobID id) {
       jobsToRecover.add(id);
     }
@@ -1116,18 +992,6 @@
       return shouldRecover;
     }
 
-    public boolean shouldSchedule() {
-      return recoveredTrackers.isEmpty();
-    }
-
-    private void markTracker(String trackerName) {
-      recoveredTrackers.add(trackerName);
-    }
-
-    void unMarkTracker(String trackerName) {
-      recoveredTrackers.remove(trackerName);
-    }
-
     Set<JobID> getJobsToRecover() {
       return jobsToRecover;
     }
@@ -1163,229 +1027,8 @@
         }
       }
     }
-    
-    private JobStatusChangeEvent updateJob(JobInProgress jip, 
-                                           JobHistory.JobInfo job) {
-      // Change the job priority
-      String jobpriority = job.get(Keys.JOB_PRIORITY);
-      JobPriority priority = JobPriority.valueOf(jobpriority);
-      // It's important to update this via the jobtracker's api as it will 
-      // take care of updating the event listeners too
-      setJobPriority(jip.getJobID(), priority);
-
-      // Save the previous job status
-      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-      
-      // Set the start/launch time only if there are recovered tasks
-      // Increment the job's restart count
-      jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME), 
-                        job.getLong(JobHistory.Keys.LAUNCH_TIME));
-
-      // Save the new job status
-      JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-      
-      return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus, 
-                                      newStatus);
-    }
-    
-    private void updateTip(TaskInProgress tip, JobHistory.Task task) {
-      long startTime = task.getLong(Keys.START_TIME);
-      if (startTime != 0) {
-        tip.setExecStartTime(startTime);
-      }
-      
-      long finishTime = task.getLong(Keys.FINISH_TIME);
-      // For failed tasks finish-time will be missing
-      if (finishTime != 0) {
-        tip.setExecFinishTime(finishTime);
-      }
-      
-      String cause = task.get(Keys.TASK_ATTEMPT_ID);
-      if (cause.length() > 0) {
-        // This means that the this is a FAILED events
-        TaskAttemptID id = TaskAttemptID.forName(cause);
-        TaskStatus status = tip.getTaskStatus(id);
-        synchronized (JobTracker.this) {
-          // This will add the tip failed event in the new log
-          tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
-                                  status.getPhase(), status.getRunState(), 
-                                  status.getTaskTracker());
-        }
-      }
-    }
-    
-    private void createTaskAttempt(JobInProgress job, 
-                                   TaskAttemptID attemptId, 
-                                   JobHistory.TaskAttempt attempt) {
-      TaskID id = attemptId.getTaskID();
-      String type = attempt.get(Keys.TASK_TYPE);
-      TaskInProgress tip = job.getTaskInProgress(id);
-      
-      //    I. Get the required info
-      TaskStatus taskStatus = null;
-      String trackerName = attempt.get(Keys.TRACKER_NAME);
-      String trackerHostName = 
-        JobInProgress.convertTrackerNameToHostName(trackerName);
-      // recover the port information.
-      int port = 0; // default to 0
-      String hport = attempt.get(Keys.HTTP_PORT);
-      if (hport != null && hport.length() > 0) {
-        port = attempt.getInt(Keys.HTTP_PORT);
-      }
-      
-      long attemptStartTime = attempt.getLong(Keys.START_TIME);
-
-      // II. Create the (appropriate) task status
-      if (type.equals(Values.MAP.name())) {
-        taskStatus = 
-          new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
-                            TaskStatus.State.RUNNING, "", "", trackerName, 
-                            TaskStatus.Phase.MAP, new Counters());
-      } else {
-        taskStatus = 
-          new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE), 
-                               TaskStatus.State.RUNNING, "", "", trackerName, 
-                               TaskStatus.Phase.REDUCE, new Counters());
-      }
-
-      // Set the start time
-      taskStatus.setStartTime(attemptStartTime);
-
-      List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
-      ttStatusList.add(taskStatus);
-      
-      // III. Create the dummy tasktracker status
-      TaskTrackerStatus ttStatus = 
-        new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
-                              0 , 0, 0);
-      ttStatus.setLastSeen(clock.getTime());
-
-      synchronized (JobTracker.this) {
-        synchronized (taskTrackers) {
-          synchronized (trackerExpiryQueue) {
-            // IV. Register a new tracker
-            TaskTracker taskTracker = getTaskTracker(trackerName);
-            boolean isTrackerRegistered =  (taskTracker != null);
-            if (!isTrackerRegistered) {
-              markTracker(trackerName); // add the tracker to recovery-manager
-              taskTracker = new TaskTracker(trackerName);
-              taskTracker.setStatus(ttStatus);
-              addNewTracker(taskTracker);
-            }
-      
-            // V. Update the tracker status
-            // This will update the meta info of the jobtracker and also add the
-            // tracker status if missing i.e register it
-            updateTaskTrackerStatus(trackerName, ttStatus);
-          }
-        }
-        // Register the attempt with job and tip, under JobTracker lock. 
-        // Since, as of today they are atomic through heartbeat.
-        // VI. Register the attempt
-        //   a) In the job
-        job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
-        //   b) In the tip
-        tip.updateStatus(taskStatus);
-      }
-      
-      // VII. Make an entry in the launched tasks
-      expireLaunchingTasks.addNewTask(attemptId);
-    }
-    
-    private void addSuccessfulAttempt(JobInProgress job, 
-                                      TaskAttemptID attemptId, 
-                                      JobHistory.TaskAttempt attempt) {
-      // I. Get the required info
-      TaskID taskId = attemptId.getTaskID();
-      String type = attempt.get(Keys.TASK_TYPE);
-
-      TaskInProgress tip = job.getTaskInProgress(taskId);
-      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
-      // Get the task status and the tracker name and make a copy of it
-      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
-      taskStatus.setFinishTime(attemptFinishTime);
-
-      String stateString = attempt.get(Keys.STATE_STRING);
-
-      // Update the basic values
-      taskStatus.setStateString(stateString);
-      taskStatus.setProgress(1.0f);
-      taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-
-      // Set the shuffle/sort finished times
-      if (type.equals(Values.REDUCE.name())) {
-        long shuffleTime = 
-          Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
-        long sortTime = 
-          Long.parseLong(attempt.get(Keys.SORT_FINISHED));
-        taskStatus.setShuffleFinishTime(shuffleTime);
-        taskStatus.setSortFinishTime(sortTime);
-      }
-      else if (type.equals(Values.MAP.name())) {
-        taskStatus.setMapFinishTime(
-            Long.parseLong(attempt.get(Keys.MAP_FINISHED)));
-      }
-
-      // Add the counters
-      String counterString = attempt.get(Keys.COUNTERS);
-      Counters counter = null;
-      //TODO Check if an exception should be thrown
-      try {
-        counter = Counters.fromEscapedCompactString(counterString);
-      } catch (ParseException pe) { 
-        counter = new Counters(); // Set it to empty counter
-      }
-      taskStatus.setCounters(counter);
-      
-      synchronized (JobTracker.this) {
-        // II. Replay the status
-        job.updateTaskStatus(tip, taskStatus);
-      }
-      
-      // III. Prevent the task from expiry
-      expireLaunchingTasks.removeTask(attemptId);
-    }
-    
-    private void addUnsuccessfulAttempt(JobInProgress job,
-                                        TaskAttemptID attemptId,
-                                        JobHistory.TaskAttempt attempt) {
-      // I. Get the required info
-      TaskID taskId = attemptId.getTaskID();
-      TaskInProgress tip = job.getTaskInProgress(taskId);
-      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
-      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
-      taskStatus.setFinishTime(attemptFinishTime);
-
-      // Reset the progress
-      taskStatus.setProgress(0.0f);
-      
-      String stateString = attempt.get(Keys.STATE_STRING);
-      taskStatus.setStateString(stateString);
-
-      boolean hasFailed = 
-        attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
-      // Set the state failed/killed
-      if (hasFailed) {
-        taskStatus.setRunState(TaskStatus.State.FAILED);
-      } else {
-        taskStatus.setRunState(TaskStatus.State.KILLED);
-      }
-
-      // Get/Set the error msg
-      String diagInfo = attempt.get(Keys.ERROR);
-      taskStatus.setDiagnosticInfo(diagInfo); // diag info
-
-      synchronized (JobTracker.this) {
-        // II. Update the task status
-        job.updateTaskStatus(tip, taskStatus);
-      }
 
-     // III. Prevent the task from expiry
-     expireLaunchingTasks.removeTask(attemptId);
-    }
-  
+   
     Path getRestartCountFile() {
       return new Path(getSystemDir(), "jobtracker.info");
     }
@@ -1484,166 +1127,25 @@
         return;
       }
 
-      LOG.info("Starting the recovery process with restart count : " 
-               + restartCount);
-
-      // I. Init the jobs and cache the recovered job history filenames
-      Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
-      Iterator<JobID> idIter = jobsToRecover.iterator();
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        LOG.info("Trying to recover details of job " + id);
-        try {
-          // 1. Create the job object
-          JobInProgress job = 
-            new JobInProgress(id, JobTracker.this, conf, restartCount);
-
-          // 2. Check if the user has appropriate access
-          // Get the user group info for the job's owner
-          UserGroupInformation ugi =
-            UserGroupInformation.readFrom(job.getJobConf());
-          LOG.info("Submitting job " + id + " on behalf of user "
-                   + ugi.getUserName() + " in groups : "
-                   + StringUtils.arrayToString(ugi.getGroupNames()));
-
-          // check the access
-          try {
-            checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
-          } catch (Throwable t) {
-            LOG.warn("Access denied for user " + ugi.getUserName() 
-                     + " in groups : [" 
-                     + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
-            throw t;
-          }
-
-          // 3. Get the log file and the file path
-          String logFileName = 
-            JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-          if (logFileName != null) {
-            Path jobHistoryFilePath = 
-              JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
-            // 4. Recover the history file. This involved
-            //     - deleting file.recover if file exists
-            //     - renaming file.recover to file if file doesnt exist
-            // This makes sure that the (master) file exists
-            JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
-                                                     jobHistoryFilePath);
-          
-            // 5. Cache the history file name as it costs one dfs access
-            jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
-          } else {
-            LOG.info("No history file found for job " + id);
-            idIter.remove(); // remove from recovery list
-          }
-
-          // 6. Sumbit the job to the jobtracker
-          addJob(id, job);
-        } catch (Throwable t) {
-          LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
-          idIter.remove();
-          continue;
-        }
-      }
-      long now = clock.getTime();
-      LOG.info("Took a total of " 
-               + StringUtils.formatTime(now 
-                                        - recoveryProcessStartTime) 
-               + " for recovering filenames of all the jobs from history.");
-
-
-      // II. Recover each job
-      idIter = jobsToRecover.iterator();
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        JobInProgress pJob = getJob(id);
-
-        // 1. Get the required info
-        // Get the recovered history file
-        Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
-        String logFileName = jobHistoryFilePath.getName();
-
-        FileSystem fs;
-        try {
-          fs = jobHistoryFilePath.getFileSystem(conf);
-        } catch (IOException ioe) {
-          LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
-                   ioe);
-          continue;
-        }
-
-        // 2. Parse the history file
-        // Note that this also involves job update
-        JobRecoveryListener listener = new JobRecoveryListener(pJob);
+      LOG.info("Starting the recovery process for " + jobsToRecover.size() +
+          " jobs ...");
+      for (JobID jobId : jobsToRecover) {
+        LOG.info("Submitting job "+ jobId);
         try {
-          JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
-                                        listener, fs);
-        } catch (Throwable t) {
-          LOG.info("Error reading history file of job " + pJob.getJobID() 
-                   + ". Ignoring the error and continuing.", t);
-        }
-
-        // 3. Close the listener
-        listener.close();
-        
-        // 4. Update the recovery metric
-        totalEventsRecovered += listener.getNumEventsRecovered();
-
-        // 5. Cleanup history
-        // Delete the master log file as an indication that the new file
-        // should be used in future
-        try {
-          synchronized (pJob) {
-            JobHistory.JobInfo.checkpointRecovery(logFileName, 
-                                                  pJob.getJobConf());
-          }
-        } catch (Throwable t) {
-          LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
-                   + id + ". Continuing.", t);
-        }
-
-        if (pJob.isComplete()) {
-          idIter.remove(); // no need to keep this job info as its successful
+          submitJob(jobId, restartCount);
+          recovered++;
+        } catch (Exception e) {
+          LOG.warn("Could not recover job " + jobId, e);
         }
       }
-
-      long recoveryProcessEndTime = clock.getTime();
-      LOG.info("Took a total of " 
-               + StringUtils.formatTime(recoveryProcessEndTime
-                                        - now) 
-               + " for parsing and recovering all the jobs from history.");
-
-      recoveryDuration = recoveryProcessEndTime - recoveryProcessStartTime;
-      LOG.info("Took a total of " + StringUtils.formatTime(recoveryDuration) 
-               + " for the whole recovery process.");
+      recoveryDuration = clock.getTime() - recoveryProcessStartTime;
       hasRecovered = true;
 
-      // III. Finalize the recovery
-      synchronized (trackerExpiryQueue) {
-        // Make sure that the tracker statuses in the expiry-tracker queue
-        // are updated
-        int size = trackerExpiryQueue.size();
-        for (int i = 0; i < size ; ++i) {
-          // Get the first tasktracker
-          TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
-
-          // Remove it
-          trackerExpiryQueue.remove(taskTracker);
-
-          // Set the new time
-          taskTracker.setLastSeen(recoveryProcessEndTime);
-
-          // Add back to get the sorted list
-          trackerExpiryQueue.add(taskTracker);
-        }
-      }
-
-      LOG.info("Restoration done. Recovery complete!");
-    }
-    
-    int totalEventsRecovered() {
-      return totalEventsRecovered;
+      LOG.info("Recovery done! Recoverd " + recovered +" of "+ 
+          jobsToRecover.size() + " jobs.");
+      LOG.info("Recovery Duration (ms):" + recoveryDuration);
     }
+
   }
 
   private JobTrackerInstrumentation myInstrumentation;
@@ -1653,15 +1155,14 @@
   ////////////////////////////////////////////////////////////////
   int port;
   String localMachine;
-  private String trackerIdentifier;
+  private final String trackerIdentifier;
   long startTime;
   int totalSubmissions = 0;
   private int totalMapTaskCapacity;
   private int totalReduceTaskCapacity;
-  private HostsFileReader hostsReader;
+  private final HostsFileReader hostsReader;
   
   // JobTracker recovery variables
-  private volatile boolean hasRestarted = false;
   private volatile boolean hasRecovered = false;
   private volatile long recoveryDuration;
 
@@ -1734,6 +1235,10 @@
   //
   int totalMaps = 0;
   int totalReduces = 0;
+  private int occupiedMapSlots = 0;
+  private int occupiedReduceSlots = 0;
+  private int reservedMapSlots = 0;
+  private int reservedReduceSlots = 0;
   private HashMap<String, TaskTracker> taskTrackers =
     new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -1745,7 +1250,7 @@
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
                                                 "expireLaunchingTasks");
 
-  CompletedJobStatusStore completedJobStatusStore = null;
+  CompletedJobStatusStore completedJobStatusStore;
   Thread completedJobsStoreThread = null;
   RecoveryManager recoveryManager;
 
@@ -1784,7 +1289,7 @@
   static final String SUBDIR = "jobTracker";
   FileSystem fs = null;
   Path systemDir = null;
-  private JobConf conf;
+  JobConf conf;
   private UserGroupInformation mrOwner;
   private String supergroup;
 
@@ -1793,7 +1298,7 @@
   long memSizeForMapSlotOnJT;
   long memSizeForReduceSlotOnJT;
 
-  private QueueManager queueManager;
+  private final QueueManager queueManager;
 
   JobTracker(JobConf conf) 
   throws IOException,InterruptedException, LoginException {
@@ -1815,7 +1320,7 @@
   throws IOException, InterruptedException, LoginException {
     clock = newClock;
     mrOwner = UnixUserGroupInformation.login(conf);
-    supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+    supergroup = conf.get(JT_SUPERGROUP, "supergroup");
     LOG.info("Starting jobtracker with owner as " + mrOwner.getUserName() 
              + " and supergroup as " + supergroup);
     this.conf = conf;
@@ -1825,19 +1330,28 @@
     // Grab some static constants
     //
     tasktrackerExpiryInterval = 
-      conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
-    retiredJobsCacheSize = 
-      conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
+      conf.getLong(JT_TRACKER_EXPIRY_INTERVAL, 10 * 60 * 1000);
+    retiredJobsCacheSize = conf.getInt(JT_RETIREJOB_CACHE_SIZE, 1000);
     MAX_BLACKLISTS_PER_TRACKER = 
-        conf.getInt("mapred.max.tracker.blacklists", 4);
+      conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
+    
     NUM_HEARTBEATS_IN_SECOND = 
-        conf.getInt("mapred.heartbeats.in.second", 100);
+      conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
+    if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
+      NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND;
+    }
+    
+    HEARTBEATS_SCALING_FACTOR = 
+      conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR, 
+                    DEFAULT_HEARTBEATS_SCALING_FACTOR);
+    if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) {
+      HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
+    }
 
     //This configuration is there solely for tuning purposes and 
     //once this feature has been tested in real clusters and an appropriate
     //value for the threshold has been found, this config might be taken out.
-    AVERAGE_BLACKLIST_THRESHOLD = 
-      conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f); 
+    AVERAGE_BLACKLIST_THRESHOLD = conf.getFloat(JTConfig.JT_AVG_BLACKLIST_THRESHOLD, 0.5f); 
 
     // This is a directory of temporary submission files.  We delete it
     // on startup, and can delete any files that we're done with
@@ -1845,8 +1359,8 @@
     initializeTaskMemoryRelatedConfig();
 
     // Read the hosts/exclude files to restrict access to the jobtracker.
-    this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
-                                           conf.get("mapred.hosts.exclude", ""));
+    this.hostsReader = new HostsFileReader(conf.get(JTConfig.JT_HOSTS_FILENAME, ""),
+                                           conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, ""));
 
     Configuration queuesConf = new Configuration(this.conf);
     queueManager = new QueueManager(queuesConf);
@@ -1854,7 +1368,7 @@
     
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
-      = conf.getClass("mapred.jobtracker.taskScheduler",
+      = conf.getClass(JT_TASK_SCHEDULER,
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
     //LOOK AT THIS TODO
@@ -1893,7 +1407,7 @@
       SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
     }
     
-    int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
+    int handlerCount = conf.getInt(JT_IPC_HANDLER_COUNT, 10);
     this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
     if (LOG.isDebugEnabled()) {
       Properties p = System.getProperties();
@@ -1905,7 +1419,7 @@
     }
 
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
-        conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
+        conf.get(JT_HTTP_ADDRESS, "0.0.0.0:50030"));
     String infoBindAddress = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
     this.startTime = clock.getTime();
@@ -1913,8 +1427,8 @@
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
     // initialize history parameters.
-    boolean historyInitialized = JobHistory.init(this, conf, this.localMachine,
-                                                 this.startTime);
+    jobHistory = new JobHistory();
+    jobHistory.init(this, conf, this.localMachine, this.startTime);
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
@@ -1944,10 +1458,10 @@
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
     this.port = interTrackerServer.getListenerAddress().getPort();
-    this.conf.set("mapred.job.tracker", (this.localMachine + ":" + this.port));
+    this.conf.set(JT_IPC_ADDRESS, (this.localMachine + ":" + this.port));
     LOG.info("JobTracker up at: " + this.port);
     this.infoPort = this.infoServer.getPort();
-    this.conf.set("mapred.job.tracker.http.address", 
+    this.conf.set(JT_HTTP_ADDRESS, 
         infoBindAddress + ":" + this.infoPort); 
     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
     
@@ -1978,8 +1492,7 @@
         
         // Check if the history is enabled .. as we can't have persistence with 
         // history disabled
-        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
-            && !JobHistory.isDisableHistory()
+        if (conf.getBoolean(JT_RESTART_ENABLED, false) 
             && systemDirData != null) {
           for (FileStatus status : systemDirData) {
             try {
@@ -1991,8 +1504,7 @@
           }
           
           // Check if there are jobs to be recovered
-          hasRestarted = recoveryManager.shouldRecover();
-          if (hasRestarted) {
+          if (recoveryManager.shouldRecover()) {
             break; // if there is something to recover else clean the sys dir
           }
         }
@@ -2004,9 +1516,9 @@
         }
         LOG.error("Mkdirs failed to create " + systemDir);
       } catch (AccessControlException ace) {
-        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
+        LOG.warn("Failed to operate on " + JTConfig.JT_SYSTEM_DIR + "(" + systemDir 
                  + ") because of permissions.");
-        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
+        LOG.warn("Manually delete the " + JTConfig.JT_SYSTEM_DIR + "(" + systemDir 
                  + ") and then start the JobTracker.");
         LOG.warn("Bailing out ... ");
         throw ace;
@@ -2030,19 +1542,17 @@
     jobConf.deleteLocalFiles(SUBDIR);
 
     // Initialize history DONE folder
-    if (historyInitialized) {
-      JobHistory.initDone(conf, fs);
-      String historyLogDir = 
-        JobHistory.getCompletedJobHistoryLocation().toString();
-      infoServer.setAttribute("historyLogDir", historyLogDir);
-      FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
-      infoServer.setAttribute("fileSys", historyFS);
-    }
+    jobHistory.initDone(conf, fs);
+    String historyLogDir = 
+      jobHistory.getCompletedJobHistoryLocation().toString();
+    infoServer.setAttribute("historyLogDir", historyLogDir);
+    FileSystem historyFS = new Path(historyLogDir).getFileSystem(conf);
+    infoServer.setAttribute("fileSys", historyFS);
 
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
-    this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
+    this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS, 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
@@ -2080,13 +1590,6 @@
   }
 
   /**
-   * Whether the JT has restarted
-   */
-  public boolean hasRestarted() {
-    return hasRestarted;
-  }
-
-  /**
    * Whether the JT has recovered upon restart
    */
   public boolean hasRecovered() {
@@ -2097,13 +1600,11 @@
    * How long the jobtracker took to recover from restart.
    */
   public long getRecoveryDuration() {
-    return hasRestarted() 
-           ? recoveryDuration
-           : 0;
+    return recoveryDuration;
   }
 
   /**
-   * Get JobTracker's FileSystem. This is the filesystem for mapred.system.dir.
+   * Get JobTracker's FileSystem. This is the filesystem for mapreduce.system.dir.
    */
   FileSystem getFileSystem() {
     return fs;
@@ -2111,7 +1612,7 @@
 
   /**
    * Get the FileSystem for the given path. This can be used to resolve
-   * filesystem for job history, local job files or mapred.system.dir path.
+   * filesystem for job history, local job files or mapreduce.system.dir path.
    */
   FileSystem getFileSystem(Path path) throws IOException {
     return path.getFileSystem(conf);
@@ -2130,12 +1631,12 @@
   }
 
   public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
-    return conf.getClass("mapred.jobtracker.instrumentation",
+    return conf.getClass(JT_INSTRUMENTATION,
         JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
   }
   
   public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
-    conf.setClass("mapred.jobtracker.instrumentation",
+    conf.setClass(JT_INSTRUMENTATION,
         t, JobTrackerInstrumentation.class);
   }
 
@@ -2145,7 +1646,7 @@
 
   public static InetSocketAddress getAddress(Configuration conf) {
     String jobTrackerStr =
-      conf.get("mapred.job.tracker", "localhost:8012");
+      conf.get(JT_IPC_ADDRESS, "localhost:8012");
     return NetUtils.createSocketAddr(jobTrackerStr);
   }
 
@@ -2183,12 +1684,8 @@
     enterLiveState();
     taskScheduler.start();
     
-    //  Start the recovery after starting the scheduler
-    try {
-      recoveryManager.recover();
-    } catch (Throwable t) {
-      LOG.warn("Recovery manager crashed! Ignoring.", t);
-    }
+    recoveryManager.recover();
+    
     // refresh the node list as the recovery manager might have added 
     // disallowed trackers
     refreshHosts();
@@ -2314,6 +1811,13 @@
       }
       fs = null;
     }
+
+    if (jobHistory != null) {
+      jobHistory.shutDown();
+    }
+    
+    LOG.info("stopped all jobtracker services");
+    return;
   }
 
 
@@ -2464,7 +1968,7 @@
    * 
    * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
    */
-  private void removeMarkedTasks(String taskTracker) {
+  void removeMarkedTasks(String taskTracker) {
     // Purge all the 'marked' tasks which were running at taskTracker
     Set<TaskAttemptID> markedTaskSet = 
       trackerToMarkedTasksMap.get(taskTracker);
@@ -2516,17 +2020,10 @@
 
     // start the merge of log files
     JobID id = job.getStatus().getJobID();
-    if (job.hasRestarted()) {
-      try {
-        JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
-      } catch (IOException ioe) {
-        LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
-      }
-    }
 
     // mark the job as completed
     try {
-      JobHistory.JobInfo.markCompleted(id);
+      jobHistory.markCompleted(id);
     } catch (IOException ioe) {
       LOG.info("Failed to mark job " + id + " as completed!", ioe);
     }
@@ -2607,6 +2104,13 @@
     }
     return v;
   }
+
+  public synchronized List<JobInProgress> getFailedJobs() {
+    synchronized (jobs) {
+      return failedJobs();
+    }
+  }
+
   public Vector<JobInProgress> completedJobs() {
     Vector<JobInProgress> v = new Vector<JobInProgress>();
     for (Iterator it = jobs.values().iterator(); it.hasNext();) {
@@ -2619,6 +2123,12 @@
     return v;
   }
 
+  public synchronized List<JobInProgress> getCompletedJobs() {
+    synchronized (jobs) {
+      return completedJobs();
+    }
+  }
+
   /**
    * Get all the task trackers in the cluster
    * 
@@ -2754,7 +2264,7 @@
    * 
    * @param status Task Tracker's status
    */
-  private void addNewTracker(TaskTracker taskTracker) {
+  void addNewTracker(TaskTracker taskTracker) {
     TaskTrackerStatus status = taskTracker.getStatus();
     trackerExpiryQueue.add(status);
 
@@ -2772,6 +2282,7 @@
       hostnameToTaskTracker.put(hostname, trackers);
     }
     statistics.taskTrackerAdded(status.getTrackerName());
+    getInstrumentation().addTrackers(1);
     LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
              + hostname);
     trackers.add(taskTracker);
@@ -2851,7 +2362,7 @@
   
   // Update the listeners about the job
   // Assuming JobTracker is locked on entry.
-  private void updateJobInProgressListeners(JobChangeEvent event) {
+  void updateJobInProgressListeners(JobChangeEvent event) {
     for (JobInProgressListener listener : jobInProgressListeners) {
       listener.jobUpdated(event);
     }
@@ -2912,7 +2423,6 @@
     
     HeartbeatResponse prevHeartbeatResponse =
       trackerToHeartbeatResponseMap.get(trackerName);
-    boolean addRestartInfo = false;
 
     if (initialContact != true) {
       // If this isn't the 'initial contact' from the tasktracker,
@@ -2922,20 +2432,15 @@
       if (prevHeartbeatResponse == null) {
         // This is the first heartbeat from the old tracker to the newly 
         // started JobTracker
-        if (hasRestarted()) {
-          addRestartInfo = true;
-          // inform the recovery manager about this tracker joining back
-          recoveryManager.unMarkTracker(trackerName);
-        } else {
-          // Jobtracker might have restarted but no recovery is needed
-          // otherwise this code should not be reached
-          LOG.warn("Serious problem, cannot find record of 'previous' " +
-                   "heartbeat for '" + trackerName + 
-                   "'; reinitializing the tasktracker");
-          return new HeartbeatResponse(responseId, 
-              new TaskTrackerAction[] {new ReinitTrackerAction()});
-        }
-
+        
+        // Jobtracker might have restarted but no recovery is needed
+        // otherwise this code should not be reached
+        LOG.warn("Serious problem, cannot find record of 'previous' " +
+                 "heartbeat for '" + trackerName + 
+                 "'; reinitializing the tasktracker");
+        return new HeartbeatResponse(responseId, 
+            new TaskTrackerAction[] {new ReinitTrackerAction()});
+      
       } else {
                 
         // It is completely safe to not process a 'duplicate' heartbeat from a 
@@ -2967,7 +2472,7 @@
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
     isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
     // Check for new tasks to be executed on the tasktracker
-    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
+    if (acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
@@ -3010,11 +2515,6 @@
     response.setActions(
                         actions.toArray(new TaskTrackerAction[actions.size()]));
     
-    // check if the restart info is req
-    if (addRestartInfo) {
-      response.setRecoveredJobs(recoveryManager.getJobsToRecover());
-    }
-        
     // Update the trackerToHeartbeatResponseMap
     trackerToHeartbeatResponseMap.put(trackerName, response);
 
@@ -3026,15 +2526,16 @@
   
   /**
    * Calculates next heartbeat interval using cluster size.
-   * Heartbeat interval is incremented 1second for every 50 nodes. 
+   * Heartbeat interval is incremented by 1 second for every 100 nodes by default. 
    * @return next heartbeat interval.
    */
   public int getNextHeartbeatInterval() {
     // get the no of task trackers
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
-                                (int)(1000 * Math.ceil((double)clusterSize / 
-                                                       NUM_HEARTBEATS_IN_SECOND)),
+                                (int)(1000 * HEARTBEATS_SCALING_FACTOR *
+                                      Math.ceil((double)clusterSize / 
+                                                NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
   }
@@ -3072,13 +2573,19 @@
    * @param status The new status for the task tracker
    * @return Was an old status found?
    */
-  private boolean updateTaskTrackerStatus(String trackerName,
+  boolean updateTaskTrackerStatus(String trackerName,
                                           TaskTrackerStatus status) {
     TaskTracker tt = getTaskTracker(trackerName);
     TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+      occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+      getInstrumentation().decRunningMaps(oldStatus.countMapTasks());
+      getInstrumentation().decRunningReduces(oldStatus.countReduceTasks());
+      getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
+      getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
         int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
@@ -3101,6 +2608,12 @@
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      occupiedMapSlots += status.countOccupiedMapSlots();
+      occupiedReduceSlots += status.countOccupiedReduceSlots();
+      getInstrumentation().addRunningMaps(status.countMapTasks());
+      getInstrumentation().addRunningReduces(status.countReduceTasks());
+      getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
+      getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
         int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
@@ -3168,6 +2681,25 @@
     return oldStatus != null;
   }
   
+  // Increment the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void incrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots += reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots += reservedSlots;
+    }
+  }
+
+  // Decrement the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void decrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots -= reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots -= reservedSlots;
+    }
+  }
   
   private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
     TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
@@ -3180,7 +2712,7 @@
   /**
    * Process incoming heartbeat messages from the task trackers.
    */
-  private synchronized boolean processHeartbeat(
+  synchronized boolean processHeartbeat(
                                  TaskTrackerStatus trackerStatus, 
                                  boolean initialContact) {
     String trackerName = trackerStatus.getTrackerName();
@@ -3209,7 +2741,7 @@
           // if this is lost tracker that came back now, and if it blacklisted
           // increment the count of blacklisted trackers in the cluster
           if (isBlacklisted(trackerName)) {
-            faultyTrackers.numBlacklistedTrackers += 1;
+            faultyTrackers.incrBlackListedTrackers(1);
           }
           addNewTracker(taskTracker);
         }
@@ -3226,8 +2758,7 @@
    * A tracker wants to know if any of its Tasks have been
    * closed (because the job completed, whether successfully or not)
    */
-  private synchronized List<TaskTrackerAction> getTasksToKill(
-                                                              String taskTracker) {
+  synchronized List<TaskTrackerAction> getTasksToKill(String taskTracker) {
     
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
     List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
@@ -3304,7 +2835,7 @@
   /**
    * A tracker wants to know if any of its Tasks can be committed 
    */
-  private synchronized List<TaskTrackerAction> getTasksToSave(
+  synchronized List<TaskTrackerAction> getTasksToSave(
                                                  TaskTrackerStatus tts) {
     List<TaskStatus> taskStatuses = tts.getTaskReports();
     if (taskStatuses != null) {
@@ -3430,10 +2961,21 @@
 
   /**
    * Allocates a new JobId string.
+   * @deprecated use {@link #getNewJobID()} instead
    */
+  @Deprecated
   public synchronized JobID getNewJobId() throws IOException {
     verifyServiceState(ServiceState.LIVE);
-    return new JobID(getTrackerIdentifier(), nextJobId++);
+    return JobID.downgrade(getNewJobID());
+  }
+
+  /**
+   * Allocates a new JobId string.
+   */
+  public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID()
+      throws IOException {
+    return new org.apache.hadoop.mapreduce.JobID(
+      getTrackerIdentifier(), nextJobId++);
   }
 
   /**
@@ -3444,17 +2986,41 @@
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    */
+  public synchronized org.apache.hadoop.mapreduce.JobStatus submitJob(
+      org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
+    return submitJob(JobID.downgrade(jobId));
+  }
+  
+  /**
+   * JobTracker.submitJob() kicks off a new job.  
+   *
+   * Create a 'JobInProgress' object, which contains both JobProfile
+   * and JobStatus.  Those two sub-objects are sometimes shipped outside
+   * of the JobTracker.  But JobInProgress adds info that's useful for
+   * the JobTracker alone.
+   * @deprecated Use 
+   * {@link #submitJob(org.apache.hadoop.mapreduce.JobID)} instead
+   */
+  @Deprecated
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
     verifyServiceState(ServiceState.LIVE);
+    return submitJob(jobId, 0);
+  }
+
+  /**
+   * Submits either a new job or a job from an earlier run.
+   */
+  private synchronized JobStatus submitJob(JobID jobId, 
+      int restartCount) throws IOException {
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
     }
     
-    JobInProgress job = new JobInProgress(jobId, this, this.conf);
+    JobInProgress job = new JobInProgress(jobId, this, this.conf, restartCount);
     
     String queue = job.getProfile().getQueueName();
-    if(!(queueManager.getQueues().contains(queue))) {      
+    if(!(queueManager.getLeafQueueNames().contains(queue))) {
       new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
@@ -3464,9 +3030,11 @@
       new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));      
       throw new IOException("Queue \"" + queue + "\" is not running");
     }
-    // check for access
     try {
-      checkAccess(job, Queue.QueueOperation.SUBMIT_JOB);
+      // check for access
+      UserGroupInformation ugi =
+        UserGroupInformation.readFrom(job.getJobConf());
+      checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
     } catch (IOException ioe) {
        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                 + ". Ignoring job " + jobId, ioe);
@@ -3574,7 +3142,67 @@
       }
     }
   }
-    
+  
+  public synchronized ClusterMetrics getClusterMetrics() {
+    return new ClusterMetrics(totalMaps,
+      totalReduces, occupiedMapSlots, occupiedReduceSlots,
+      reservedMapSlots, reservedReduceSlots,
+      totalMapTaskCapacity, totalReduceTaskCapacity,
+      totalSubmissions,
+      taskTrackers.size() - getBlacklistedTrackerCount(), 
+      getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
+  }
+
+  public org.apache.hadoop.mapreduce.server.jobtracker.State 
+      getJobTrackerState() {
+      return getServiceState().equals(ServiceState.LIVE) ?
+              org.apache.hadoop.mapreduce.server.jobtracker.State.RUNNING
+              : org.apache.hadoop.mapreduce.server.jobtracker.State.INITIALIZING;
+  }
+  
+  public long getTaskTrackerExpiryInterval() {
+    return tasktrackerExpiryInterval;
+  }
+  
+  /** 
+   * Get all active trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getActiveTrackers() 
+  throws IOException, InterruptedException {
+    List<String> activeTrackers = taskTrackerNames().get(0);
+    TaskTrackerInfo[] info = new TaskTrackerInfo[activeTrackers.size()];
+    for (int i = 0; i < activeTrackers.size(); i++) {
+      info[i] = new TaskTrackerInfo(activeTrackers.get(i));
+    }
+    return info;
+  }
+
+  /** 
+   * Get all blacklisted trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getBlacklistedTrackers() 
+  throws IOException, InterruptedException {
+    Collection<BlackListInfo> blackListed = getBlackListedTrackers();
+    TaskTrackerInfo[] info = new TaskTrackerInfo[blackListed.size()];
+    int i = 0;
+    for (BlackListInfo binfo : blackListed) {
+      info[i++] = new TaskTrackerInfo(binfo.getTrackerName(),
+        binfo.getReasonForBlackListing(), binfo.getBlackListReport());
+    }
+    return info;
+  }
+
+  public synchronized void killJob(org.apache.hadoop.mapreduce.JobID jobid) 
+      throws IOException {
+    killJob(JobID.downgrade(jobid));
+  }
+  
+  /**
+   * @deprecated Use {@link #killJob(org.apache.hadoop.mapreduce.JobID)} instead 
+   */
+  @Deprecated
   public synchronized void killJob(JobID jobid) throws IOException {
     if (null == jobid) {
       LOG.info("Null jobid object sent to JobTracker.killJob()");
@@ -3702,6 +3330,18 @@
    * @param jobid id of the job
    * @param priority new priority of the job
    */
+  public synchronized void setJobPriority(org.apache.hadoop.mapreduce.JobID 
+      jobid, String priority) throws IOException {
+    setJobPriority(JobID.downgrade(jobid), priority);
+  }
+  /**
+   * Set the priority of a job
+   * @param jobid id of the job
+   * @param priority new priority of the job
+   * @deprecated Use 
+   * {@link #setJobPriority(org.apache.hadoop.mapreduce.JobID, String)} instead
+   */
+  @Deprecated
   public synchronized void setJobPriority(JobID jobid, 
                                               String priority)
                                                 throws IOException {
@@ -3721,6 +3361,15 @@
     completedJobStatusStore.store(job);
   }
 
+  public JobProfile getJobProfile(org.apache.hadoop.mapreduce.JobID jobid) {
+    return getJobProfile(JobID.downgrade(jobid));
+  }
+  
+  /**
+   * @deprecated Use {@link #getJobProfile(org.apache.hadoop.mapreduce.JobID)} 
+   * instead
+   */
+  @Deprecated
   public JobProfile getJobProfile(JobID jobid) {
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
@@ -3730,6 +3379,16 @@
     }
     return completedJobStatusStore.readJobProfile(jobid);
   }
+  
+  public JobStatus getJobStatus(org.apache.hadoop.mapreduce.JobID jobid) {
+    return getJobStatus(JobID.downgrade(jobid));
+  }
+
+  /**
+   * @deprecated Use 
+   * {@link #getJobStatus(org.apache.hadoop.mapreduce.JobID)} instead
+   */
+  @Deprecated
   public JobStatus getJobStatus(JobID jobid) {
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
@@ -3749,6 +3408,21 @@
     }
     return completedJobStatusStore.readJobStatus(jobid);
   }
+  
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(
+      org.apache.hadoop.mapreduce.JobID jobid) {
+    Counters counters = getJobCounters(JobID.downgrade(jobid));
+    if (counters != null) {
+      return new org.apache.hadoop.mapreduce.Counters(counters);
+    }
+    return null;
+  }
+  
+  /**
+   * @deprecated Use 
+   * {@link #getJobCounters(org.apache.hadoop.mapreduce.JobID)} instead
+   */
+  @Deprecated
   public Counters getJobCounters(JobID jobid) {
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
@@ -3758,6 +3432,15 @@
     }
     return completedJobStatusStore.readCounters(jobid);
   }
+  
+  /**
+   * @param jobid
+   * @return array of TaskReport
+   * @deprecated Use 
+   * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} 
+   * instead
+   */
+  @Deprecated
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
     if (job == null) {
@@ -3780,6 +3463,14 @@
     }
   }
 
+  /**
+   * @param jobid
+   * @return array of TaskReport
+   * @deprecated Use 
+   * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} 
+   * instead
+   */
+  @Deprecated
   public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
     if (job == null) {
@@ -3800,6 +3491,14 @@
     }
   }
 
+  /**
+   * @param jobid
+   * @return array of TaskReport
+   * @deprecated Use 
+   * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} 
+   * instead
+   */
+  @Deprecated
   public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
     if (job == null) {
@@ -3822,7 +3521,15 @@
     }
   
   }
-  
+
+  /**
+   * @param jobid
+   * @return array of TaskReport
+   * @deprecated Use 
+   * {@link #getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)} 
+   * instead
+   */
+  @Deprecated
   public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
     if (job == null) {
@@ -3844,24 +3551,41 @@
       return reports.toArray(new TaskReport[reports.size()]);
     }
   }
-  
+
+  public synchronized TaskReport[] getTaskReports(
+      org.apache.hadoop.mapreduce.JobID jobid, TaskType type) {
+    switch (type) {
+      case MAP :
+        return getMapTaskReports(JobID.downgrade(jobid));
+      case REDUCE :
+        return getReduceTaskReports(JobID.downgrade(jobid));
+      case JOB_CLEANUP:
+        return getCleanupTaskReports(JobID.downgrade(jobid));
+      case JOB_SETUP :
+        return getSetupTaskReports(JobID.downgrade(jobid));
+    }
+    return new TaskReport[0];
+  }
+
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
 
-  static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
-      "mapred.cluster.map.memory.mb";
-  static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
-      "mapred.cluster.reduce.memory.mb";
-
-  static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =
-      "mapred.cluster.max.map.memory.mb";
-  static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
-      "mapred.cluster.max.reduce.memory.mb";
+  /* 
+   * Returns a list of TaskCompletionEvent for the given job, 
+   * starting from fromEventId.
+   */
+  public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+      org.apache.hadoop.mapreduce.JobID jobid, int fromEventId, int maxEvents)
+      throws IOException {
+    return getTaskCompletionEvents(JobID.downgrade(jobid),
+      fromEventId, maxEvents);
+  }
   
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
    * starting from fromEventId.
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
    */
+  @Deprecated
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
     synchronized (this) {
@@ -3882,6 +3606,17 @@
    * @param taskId the id of the task
    * @return an array of the diagnostic messages
    */
+  public synchronized String[] getTaskDiagnostics(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskId)  
+      throws IOException {
+    return getTaskDiagnostics(TaskAttemptID.downgrade(taskId));
+  }
+  /**
+   * Get the diagnostics for a given task
+   * @param taskId the id of the task
+   * @return an array of the diagnostic messages
+   */
+  @Deprecated
   public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)  
     throws IOException {
     List<String> taskDiagnosticInfo = null;
@@ -3937,8 +3672,15 @@
     JobInProgress job = jobs.get(tipid.getJobID());
     return (job == null ? null : job.getTaskInProgress(tipid));
   }
-    
+
+  public synchronized boolean killTask(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskid,
+      boolean shouldFail) throws IOException {
+    return killTask(TaskAttemptID.downgrade(taskid), shouldFail);
+  }
+  
   /** Mark a Task to be killed */
+  @Deprecated
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
@@ -3964,7 +3706,7 @@
     return getJobStatus(jobs.values(), true);
   } 
   
-  public JobStatus[] getAllJobs() {
+  public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {
     List<JobStatus> list = new ArrayList<JobStatus>();
     list.addAll(Arrays.asList(getJobStatus(jobs.values(),false)));
     list.addAll(retireJobs.getAll());
@@ -3972,17 +3714,25 @@
   }
     
   /**
-   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
    */
   public String getSystemDir() {
     if (fs == null) {
       throw new java.lang.IllegalStateException("Filesystem is null; "
               + "JobTracker is not live: " + this);
     }
-    Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
+    Path sysDir = new Path(conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));
     return fs.makeQualified(sysDir).toString();
   }
   
+  /**
+   * @see 
+   * org.apache.hadoop.mapreduce.protocol.ClientProtocol#getJobHistoryDir()
+   */
+  public String getJobHistoryDir() {
+    return jobHistory.getCompletedJobHistoryLocation().toString();
+  }
+
   ///////////////////////////////////////////////////////////////
   // JobTracker methods
   ///////////////////////////////////////////////////////////////
@@ -4064,14 +3814,8 @@
       }
 
       TaskInProgress tip = taskidToTIPMap.get(taskId);
-      // Check if the tip is known to the jobtracker. In case of a restarted
-      // jt, some tasks might join in later
-      if (tip != null || hasRestarted()) {
-        if (tip == null) {
-          tip = job.getTaskInProgress(taskId.getTaskID());
-          job.addRunningTaskToTIP(tip, taskId, status, false);
-        }
-        
+      
+      if (tip != null) {
         // Update the job and inform the listeners if necessary
         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
         // Clone TaskStatus object here, because JobInProgress
@@ -4132,9 +3876,6 @@
       trackerToTasksToCleanup.remove(trackerName);
     }
     
-    // Inform the recovery manager
-    recoveryManager.unMarkTracker(trackerName);
-    
     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);
 
@@ -4205,13 +3946,13 @@
   }
   
   private synchronized void refreshHosts() throws IOException {
-    // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
+    // Reread the config to get HOSTS and HOSTS_EXCLUDE filenames.
     // Update the file names and refresh internal includes and excludes list
     LOG.info("Refreshing hosts information");
     Configuration conf = new Configuration();
 
-    hostsReader.updateFileNames(conf.get("mapred.hosts",""), 
-                                conf.get("mapred.hosts.exclude", ""));
+    hostsReader.updateFileNames(conf.get(JTConfig.JT_HOSTS_FILENAME,""), 
+                                conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, ""));
     hostsReader.refresh();
     
     Set<String> excludeSet = new HashSet<String>();
@@ -4227,12 +3968,13 @@
   }
 
   // main decommission
-  private synchronized void decommissionNodes(Set<String> hosts) 
+  synchronized void decommissionNodes(Set<String> hosts) 
   throws IOException {  
     LOG.info("Decommissioning " + hosts.size() + " nodes");
     // create a list of tracker hostnames
     synchronized (taskTrackers) {
       synchronized (trackerExpiryQueue) {
+        int trackersDecommissioned = 0;
         for (String host : hosts) {
           LOG.info("Decommissioning host " + host);
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
@@ -4240,12 +3982,13 @@
             for (TaskTracker tracker : trackers) {
               LOG.info("Decommission: Losing tracker " + tracker + 
                        " on host " + host);
-              lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+              removeTracker(tracker);
             }
+            trackersDecommissioned += trackers.size();
           }
           LOG.info("Host " + host + " is ready for decommissioning");
         }
+        getInstrumentation().setDecommissionedTrackers(trackersDecommissioned);
       }
     }
   }
@@ -4257,14 +4000,6 @@
     return hostsReader.getExcludedHosts();
   }
 
-  /**
-   * Get the localized job file path on the job trackers local file system
-   * @param jobId id of the job
-   * @return the path of the job conf file on the local file system
-   */
-  public static String getLocalJobFilePath(JobID jobId){
-    return JobHistory.JobInfo.getLocalJobFilePath(jobId);
-  }
   ////////////////////////////////////////////////////////////
   // main()
   ////////////////////////////////////////////////////////////
@@ -4285,6 +4020,9 @@
       else {
         if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
           dumpConfiguration(new PrintWriter(System.out));
+          System.out.println();
+          QueueManager.dumpConfiguration(new PrintWriter(System.out),
+              new JobConf());
         }
         else {
           System.out.println("usage: JobTracker [-dumpConfiguration]");
@@ -4305,30 +4043,84 @@
   private static void dumpConfiguration(Writer writer) throws IOException {
     Configuration.dumpConfiguration(new JobConf(), writer);
     writer.write("\n");
-    // get the QueueManager configuration properties
-    QueueManager.dumpConfiguration(writer);
-    writer.write("\n");
   }
 
+  /**
+   * Gets the root level queues.
+   *
+   * @return array of QueueInfo object.
+   * @throws java.io.IOException
+   */
+   @Override
+  public QueueInfo[] getRootQueues() throws IOException {
+    return getQueueInfoArray(queueManager.getRootQueues());
+  }
+ 
+  /**
+   * Returns immediate children of queueName.
+   *
+   * @param queueName
+   * @return array of QueueInfo which are children of queueName
+   * @throws java.io.IOException
+   */
   @Override
-  public JobQueueInfo[] getQueues() throws IOException {
-    return queueManager.getJobQueueInfos();
+  public QueueInfo[] getChildQueues(String queueName) throws IOException {
+    return getQueueInfoArray(queueManager.getChildQueues(queueName));
   }
 
+  /**
+   * Gets the root level queues.
+   *
+   * @return array of JobQueueInfo object.
+   * @throws java.io.IOException
+   */
+   @Deprecated
+  public JobQueueInfo[] getRootJobQueues() throws IOException {
+    return queueManager.getRootQueues();
+  }
 
-  @Override
+  @Deprecated 
+  public JobQueueInfo[] getJobQueues() throws IOException {
+    return queueManager.getJobQueueInfos();
+  }
+
+  @Deprecated 
   public JobQueueInfo getQueueInfo(String queue) throws IOException {
     return queueManager.getJobQueueInfo(queue);
   }
 
+  private QueueInfo[] getQueueInfoArray(JobQueueInfo[] queues) 
+      throws IOException {
+    for (JobQueueInfo queue : queues) {
+      queue.setJobStatuses(getJobsFromQueue(queue.getQueueName()));
+    }
+    return queues;
+  }
+  
+  @Override
+  public QueueInfo[] getQueues() throws IOException {
+    return getQueueInfoArray(queueManager.getJobQueueInfos());
+  }
+
   @Override
-  public JobStatus[] getJobsFromQueue(String queue) throws IOException {
-    Collection<JobInProgress> jips = taskScheduler.getJobs(queue);
+  public QueueInfo getQueue(String queue) throws IOException {
+    JobQueueInfo jqueue = queueManager.getJobQueueInfo(queue);
+    jqueue.setJobStatuses(getJobsFromQueue(jqueue.getQueueName()));
+    return jqueue;
+  }
+
+  public org.apache.hadoop.mapreduce.JobStatus[] getJobsFromQueue(String queue) 
+      throws IOException {
+    Collection<JobInProgress> jips = null;
+    if (queueManager.getLeafQueueNames().contains(queue)) {
+      jips = taskScheduler.getJobs(queue);
+    }
     return getJobStatus(jips,false);
   }
   
   @Override
-  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
+  public org.apache.hadoop.mapreduce.QueueAclsInfo[] 
+      getQueueAclsForCurrentUser() throws IOException{
     return queueManager.getQueueAcls(
             UserGroupInformation.getCurrentUGI());
   }
@@ -4359,7 +4151,7 @@
    * Returns the confgiured maximum number of tasks for a single job
    */
   int getMaxTasksPerJob() {
-    return conf.getInt("mapred.jobtracker.maxtasks.per.job", -1);
+    return conf.getInt(JT_TASKS_PER_JOB, -1);
   }
   
   @Override
@@ -4375,25 +4167,26 @@
   public void refreshQueues() throws IOException{
     LOG.info("Refreshing queue information. requested by : " + 
         UserGroupInformation.getCurrentUGI().getUserName());
-    this.queueManager.refreshQueues(new Configuration(this.conf));
+    this.queueManager.refreshQueues(new Configuration(this.conf),
+        taskScheduler.getQueueRefresher());
   }
 
   private void initializeTaskMemoryRelatedConfig() {
     memSizeForMapSlotOnJT =
         JobConf.normalizeMemoryConfigValue(conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            MAPMEMORY_MB,
             JobConf.DISABLED_MEMORY_LIMIT));
     memSizeForReduceSlotOnJT =
         JobConf.normalizeMemoryConfigValue(conf.getLong(
-            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            REDUCEMEMORY_MB,
             JobConf.DISABLED_MEMORY_LIMIT));
 
     if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
       LOG.warn(
         JobConf.deprecatedString(
           JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
-          " instead use "+JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
-          " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
+          " instead use "+JTConfig.JT_MAX_MAPMEMORY_MB+
+          " and " + JTConfig.JT_MAX_REDUCEMEMORY_MB
       );
 
       limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
@@ -4411,12 +4204,12 @@
       limitMaxMemForMapTasks =
         JobConf.normalizeMemoryConfigValue(
           conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JTConfig.JT_MAX_MAPMEMORY_MB,
             JobConf.DISABLED_MEMORY_LIMIT));
       limitMaxMemForReduceTasks =
         JobConf.normalizeMemoryConfigValue(
           conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JTConfig.JT_MAX_REDUCEMEMORY_MB,
             JobConf.DISABLED_MEMORY_LIMIT));
     }
 
@@ -4526,6 +4319,115 @@
   void incrementFaults(String hostName) {
     faultyTrackers.incrementFaults(hostName);
   }
-  
-  
+
+  JobTracker(JobConf conf, Clock clock, boolean ignoredForSimulation) 
+  throws IOException {
+    this.clock = clock;
+    this.conf = conf;
+    trackerIdentifier = getDateFormat().format(new Date());
+
+    if (fs == null) {
+      fs = FileSystem.get(conf);
+    }
+    
+    tasktrackerExpiryInterval = 
+      conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
+    retiredJobsCacheSize = 
+      conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
+
+    // min time before retire
+    MAX_BLACKLISTS_PER_TRACKER = 
+        conf.getInt("mapred.max.tracker.blacklists", 4);
+    NUM_HEARTBEATS_IN_SECOND = 
+        conf.getInt("mapred.heartbeats.in.second", 100);
+    
+    try {
+      mrOwner = UnixUserGroupInformation.login(conf);
+    } catch (LoginException e) {
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+    supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+    
+    this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
+        conf.get("mapred.hosts.exclude", ""));
+    // queue manager
+    Configuration queuesConf = new Configuration(this.conf);
+    queueManager = new QueueManager(queuesConf);
+
+    // Create the scheduler
+    Class<? extends TaskScheduler> schedulerClass
+      = conf.getClass("mapred.jobtracker.taskScheduler",
+          JobQueueTaskScheduler.class, TaskScheduler.class);
+    taskScheduler = 
+      (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf);
+    
+    // Set ports, start RPC servers, setup security policy etc.
+    InetSocketAddress addr = getAddress(conf);
+    this.localMachine = addr.getHostName();
+    this.port = addr.getPort();
+
+    // Create the jetty server
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
+        conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
+    String infoBindAddress = infoSocAddr.getHostName();
+    int tmpInfoPort = infoSocAddr.getPort();
+    this.startTime = clock.getTime();
+    infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
+        tmpInfoPort == 0, conf);
+    infoServer.setAttribute("job.tracker", this);
+    
+    // initialize history parameters.
+    String historyLogDir = null;
+    FileSystem historyFS = null;
+
+    jobHistory = new JobHistory();
+    jobHistory.init(this, conf, this.localMachine, this.startTime);
+    jobHistory.initDone(conf, fs);
+    historyLogDir = jobHistory.getCompletedJobHistoryLocation().toString();
+    infoServer.setAttribute("historyLogDir", historyLogDir);
+    historyFS = new Path(historyLogDir).getFileSystem(conf);
+
+    infoServer.setAttribute("fileSys", historyFS);
+    infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
+    infoServer.start();
+    this.infoPort = this.infoServer.getPort();
+
+    // Initialize instrumentation
+    JobTrackerInstrumentation tmp;
+    Class<? extends JobTrackerInstrumentation> metricsInst =
+      getInstrumentationClass(conf);
+    try {
+      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+      tmp = c.newInstance(this, conf);
+    } catch(Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by 
+      //falling back on the default.
+      LOG.error("failed to initialize job tracker metrics", e);
+      tmp = new JobTrackerMetricsInst(this, conf);
+    }
+    myInstrumentation = tmp;
+    
+    // start the recovery manager
+    recoveryManager = new RecoveryManager();
+    
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+    this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
+        NetworkTopology.DEFAULT_HOST_LEVEL);
+
+    //initializes the job status store
+    completedJobStatusStore = new CompletedJobStatusStore(conf);
+  }
+
+  /**
+   * Get the path of the locally stored job file
+   * @param jobId id of the job
+   * @return the path of the job file on the local file system 
+   */
+  String getLocalJobFilePath(org.apache.hadoop.mapreduce.JobID jobId){
+    return System.getProperty("hadoop.log.dir") + 
+           File.separator + jobId + "_conf.xml";
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -84,4 +84,80 @@
 
   public void decBlackListedReduceSlots(int slots)
   { }
+
+  public void addReservedMapSlots(int slots)
+  { }
+
+  public void decReservedMapSlots(int slots)
+  { }
+
+  public void addReservedReduceSlots(int slots)
+  { }
+
+  public void decReservedReduceSlots(int slots)
+  { }
+
+  public void addOccupiedMapSlots(int slots)
+  { }
+
+  public void decOccupiedMapSlots(int slots)
+  { }
+
+  public void addOccupiedReduceSlots(int slots)
+  { }
+
+  public void decOccupiedReduceSlots(int slots)
+  { }
+
+  public void failedJob(JobConf conf, JobID id) 
+  { }
+
+  public void killedJob(JobConf conf, JobID id) 
+  { }
+
+  public void addPrepJob(JobConf conf, JobID id) 
+  { }
+  
+  public void decPrepJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void decRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningMaps(int tasks)
+  { }
+
+  public void decRunningMaps(int tasks) 
+  { }
+
+  public void addRunningReduces(int tasks)
+  { }
+
+  public void decRunningReduces(int tasks)
+  { }
+
+  public void killedMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void killedReduce(TaskAttemptID taskAttemptID)
+  { }
+
+  public void addTrackers(int trackers)
+  { }
+
+  public void decTrackers(int trackers)
+  { }
+
+  public void addBlackListedTrackers(int trackers)
+  { }
+
+  public void decBlackListedTrackers(int trackers)
+  { }
+
+  public void setDecommissionedTrackers(int trackers)
+  { }  
+
 }