You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [26/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Jun 21 06:37:27 2013
@@ -18,13 +18,9 @@
 package org.apache.hadoop.mapred;
 
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.InputStreamReader;
 import java.io.Writer;
 import java.lang.management.ManagementFactory;
 import java.net.BindException;
@@ -53,35 +49,45 @@ import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.AuditLogger.Constants;
-import org.apache.hadoop.mapred.Counters.CountersExceededException;
 import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -89,6 +95,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
@@ -96,25 +103,16 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.security.Credentials;
 import org.mortbay.util.ajax.JSON;
 
 /*******************************************************
@@ -202,13 +200,17 @@ public class JobTracker implements MRCon
   private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
   private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
   
+  final static String JT_INIT_CONFIG_KEY_FOR_TESTS = 
+      "mapreduce.jobtracker.init.for.tests";
+  
   public static enum State { INITIALIZING, RUNNING }
-  State state = State.INITIALIZING;
-  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+  volatile State state = State.INITIALIZING;
+  private static final int FS_ACCESS_RETRY_PERIOD = 1000;
   static final String JOB_INFO_FILE = "job-info";
   private DNSToSwitchMapping dnsToSwitchMapping;
-  private NetworkTopology clusterMap = new NetworkTopology();
+  private NetworkTopology clusterMap;
   private int numTaskCacheLevels; // the max level to which we cache tasks
+  private boolean isNodeGroupAware;
   /**
    * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap}
    * so that it can be safely written to and iterated on via 2 separate threads.
@@ -222,6 +224,8 @@ public class JobTracker implements MRCon
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
+  private List<ServicePlugin> plugins;
+  
   private static final LocalDirAllocator lDirAlloc = 
                               new LocalDirAllocator("mapred.local.dir");
   //system directory is completely owned by the JobTracker
@@ -276,6 +280,16 @@ public class JobTracker implements MRCon
     return clock;
   }
     
+  static final String JT_HDFS_MONITOR_ENABLE = 
+      "mapreduce.jt.hdfs.monitor.enable";
+  static final boolean DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE = false;
+  
+  static final String JT_HDFS_MONITOR_THREAD_INTERVAL = 
+      "mapreduce.jt.hdfs.monitor.interval.ms";
+  static final int DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS = 5000;
+  
+  private Thread hdfsMonitor;
+
   /**
    * Start the JobTracker with given configuration.
    * 
@@ -291,9 +305,14 @@ public class JobTracker implements MRCon
                                                  InterruptedException {
     return startTracker(conf, generateNewIdentifier());
   }
-  
+
   public static JobTracker startTracker(JobConf conf, String identifier) 
   throws IOException, InterruptedException {
+  	return startTracker(conf, identifier, false);
+  }
+  
+  public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize) 
+  throws IOException, InterruptedException {
     DefaultMetricsSystem.initialize("JobTracker");
     JobTracker result = null;
     while (true) {
@@ -320,6 +339,12 @@ public class JobTracker implements MRCon
     if (result != null) {
       JobEndNotifier.startNotifier();
       MBeans.register("JobTracker", "JobTrackerInfo", result);
+      if(initialize == true) {
+        result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
+        result.initializeFilesystem();
+        result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
+        result.initialize();
+      }
     }
     return result;
   }
@@ -627,15 +652,25 @@ public class JobTracker implements MRCon
               Map.Entry<String, ArrayList<JobInProgress>> entry = 
                 userToJobsMapIt.next();
               ArrayList<JobInProgress> userJobs = entry.getValue();
+
+              // Remove retiredJobs from userToJobsMap to ensure we don't 
+              // retire them multiple times
               Iterator<JobInProgress> it = userJobs.iterator();
-              while (it.hasNext() && 
-                  userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+              while (it.hasNext()) {
                 JobInProgress jobUser = it.next();
                 if (retiredJobs.contains(jobUser)) {
                   LOG.info("Removing from userToJobsMap: " + 
                       jobUser.getJobID());
                   it.remove();
-                } else if (minConditionToRetire(jobUser, now)) {
+                }
+              }
+              
+              // Now, check for #jobs per user
+              it = userJobs.iterator();
+              while (it.hasNext() && 
+                  userJobs.size() > MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+                JobInProgress jobUser = it.next();
+                if (minConditionToRetire(jobUser, now)) {
                   LOG.info("User limit exceeded. Marking job: " + 
                       jobUser.getJobID() + " for retire.");
                   retiredJobs.add(jobUser);
@@ -1216,179 +1251,6 @@ public class JobTracker implements MRCon
     /** A custom listener that replays the events in the order in which the 
      * events (task attempts) occurred. 
      */
-    class JobRecoveryListener implements Listener {
-      // The owner job
-      private JobInProgress jip;
-      
-      private JobHistory.JobInfo job; // current job's info object
-      
-      // Maintain the count of the (attempt) events recovered
-      private int numEventsRecovered = 0;
-      
-      // Maintains open transactions
-      private Map<String, String> hangingAttempts = 
-        new HashMap<String, String>();
-      
-      // Whether there are any updates for this job
-      private boolean hasUpdates = false;
-      
-      public JobRecoveryListener(JobInProgress jip) {
-        this.jip = jip;
-        this.job = new JobHistory.JobInfo(jip.getJobID().toString());
-      }
-
-      /**
-       * Process a task. Note that a task might commit a previously pending 
-       * transaction.
-       */
-      private void processTask(String taskId, JobHistory.Task task) {
-        // Any TASK info commits the previous transaction
-        boolean hasHanging = hangingAttempts.remove(taskId) != null;
-        if (hasHanging) {
-          numEventsRecovered += 2;
-        }
-        
-        TaskID id = TaskID.forName(taskId);
-        TaskInProgress tip = getTip(id);
-
-        updateTip(tip, task);
-      }
-
-      /**
-       * Adds a task-attempt in the listener
-       */
-      private void processTaskAttempt(String taskAttemptId, 
-                                      JobHistory.TaskAttempt attempt) 
-        throws UnknownHostException {
-        TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
-        
-        // Check if the transaction for this attempt can be committed
-        String taskStatus = attempt.get(Keys.TASK_STATUS);
-        TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId);
-        JobInProgress jip = getJob(taskID.getJobID());
-        JobStatus prevStatus = (JobStatus)jip.getStatus().clone();
-
-        if (taskStatus.length() > 0) {
-          // This means this is an update event
-          if (taskStatus.equals(Values.SUCCESS.name())) {
-            // Mark this attempt as hanging
-            hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
-            addSuccessfulAttempt(jip, id, attempt);
-          } else {
-            addUnsuccessfulAttempt(jip, id, attempt);
-            numEventsRecovered += 2;
-          }
-        } else {
-          createTaskAttempt(jip, id, attempt);
-        }
-        
-        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-        if (prevStatus.getRunState() != newStatus.getRunState()) {
-          if(LOG.isDebugEnabled())
-            LOG.debug("Status changed hence informing prevStatus" +  prevStatus + " currentStatus "+ newStatus);
-          JobStatusChangeEvent event =
-            new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
-                                     prevStatus, newStatus);
-          updateJobInProgressListeners(event);
-        }
-      }
-
-      public void handle(JobHistory.RecordTypes recType, Map<Keys, 
-                         String> values) throws IOException {
-        if (recType == JobHistory.RecordTypes.Job) {
-          // Update the meta-level job information
-          job.handle(values);
-          
-          // Forcefully init the job as we have some updates for it
-          checkAndInit();
-        } else if (recType.equals(JobHistory.RecordTypes.Task)) {
-          String taskId = values.get(Keys.TASKID);
-          
-          // Create a task
-          JobHistory.Task task = new JobHistory.Task();
-          task.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(task)) {
-            return;
-          }
-            
-          // Process the task i.e update the tip state
-          processTask(taskId, task);
-        } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the attempt state via job
-          processTaskAttempt(attemptId, attempt);
-        } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
-          String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
-          
-          // Create a task attempt
-          JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
-          attempt.handle(values);
-          
-          // Ignore if its a cleanup task
-          if (isCleanup(attempt)) {
-            return;
-          }
-          
-          // Process the attempt i.e update the job state via job
-          processTaskAttempt(attemptId, attempt);
-        }
-      }
-
-      // Check if the task is of type CLEANUP
-      private boolean isCleanup(JobHistory.Task task) {
-        String taskType = task.get(Keys.TASK_TYPE);
-        return Values.CLEANUP.name().equals(taskType);
-      }
-      
-      // Init the job if its ready for init. Also make sure that the scheduler
-      // is updated
-      private void checkAndInit() throws IOException {
-        String jobStatus = this.job.get(Keys.JOB_STATUS);
-        if (Values.PREP.name().equals(jobStatus)) {
-          hasUpdates = true;
-          LOG.info("Calling init from RM for job " + jip.getJobID().toString());
-          try {
-            initJob(jip);
-          } catch (Throwable t) {
-            LOG.error("Job initialization failed : \n"
-                + StringUtils.stringifyException(t));
-            jip.status.setFailureInfo("Job Initialization failed: \n"
-                + StringUtils.stringifyException(t));
-            failJob(jip);
-            throw new IOException(t);
-          }
-        }
-      }
-      
-      void close() {
-        if (hasUpdates) {
-          // Apply the final (job-level) updates
-          JobStatusChangeEvent event = updateJob(jip, job);
-          
-          synchronized (JobTracker.this) {
-            // Update the job listeners
-            updateJobInProgressListeners(event);
-          }
-        }
-      }
-      
-      public int getNumEventsRecovered() {
-        return numEventsRecovered;
-      }
-
-    }
     
     public RecoveryManager() {
       jobsToRecover = new TreeSet<JobID>();
@@ -1442,243 +1304,25 @@ public class JobTracker implements MRCon
     // checks if the job dir has the required files
     public void checkAndAddJob(FileStatus status) throws IOException {
       String fileName = status.getPath().getName();
-      if (isJobNameValid(fileName)) {
-        if (JobClient.isJobDirValid(status.getPath(), fs)) {
-          recoveryManager.addJobForRecovery(JobID.forName(fileName));
-          shouldRecover = true; // enable actual recovery if num-files > 1
-        } else {
-          LOG.info("Found an incomplete job directory " + fileName + "." 
-                   + " Deleting it!!");
-          fs.delete(status.getPath(), true);
-        }
-      }
-    }
-    
-    private JobStatusChangeEvent updateJob(JobInProgress jip, 
-        JobHistory.JobInfo job) {
-      // Change the job priority
-      String jobpriority = job.get(Keys.JOB_PRIORITY);
-      JobPriority priority = JobPriority.valueOf(jobpriority);
-      // It's important to update this via the jobtracker's api as it will 
-      // take care of updating the event listeners too
-      
-      try {
-        setJobPriority(jip.getJobID(), priority);
-      } catch (IOException e) {
-        // This will not happen. JobTracker can set jobPriority of any job
-        // as mrOwner has the needed permissions.
-        LOG.warn("Unexpected. JobTracker could not do SetJobPriority on "
-                 + jip.getJobID() + ". " + e);
-      }
-
-      // Save the previous job status
-      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-      
-      // Set the start/launch time only if there are recovered tasks
-      // Increment the job's restart count
-      jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME), 
-                        job.getLong(JobHistory.Keys.LAUNCH_TIME));
-
-      // Save the new job status
-      JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-      
-      return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus, 
-                                      newStatus);
-    }
-    
-    private void updateTip(TaskInProgress tip, JobHistory.Task task) {
-      long startTime = task.getLong(Keys.START_TIME);
-      if (startTime != 0) {
-        tip.setExecStartTime(startTime);
-      }
-      
-      long finishTime = task.getLong(Keys.FINISH_TIME);
-      // For failed tasks finish-time will be missing
-      if (finishTime != 0) {
-        tip.setExecFinishTime(finishTime);
-      }
-      
-      String cause = task.get(Keys.TASK_ATTEMPT_ID);
-      if (cause.length() > 0) {
-        // This means that the this is a FAILED events
-        TaskAttemptID id = TaskAttemptID.forName(cause);
-        TaskStatus status = tip.getTaskStatus(id);
-        synchronized (JobTracker.this) {
-          // This will add the tip failed event in the new log
-          tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
-                                  status.getPhase(), status.getRunState(), 
-                                  status.getTaskTracker());
-        }
-      }
-    }
-    
-    private void createTaskAttempt(JobInProgress job, 
-                                   TaskAttemptID attemptId, 
-                                   JobHistory.TaskAttempt attempt) 
-      throws UnknownHostException {
-      TaskID id = attemptId.getTaskID();
-      String type = attempt.get(Keys.TASK_TYPE);
-      TaskInProgress tip = job.getTaskInProgress(id);
-      
-      //    I. Get the required info
-      TaskStatus taskStatus = null;
-      String trackerName = attempt.get(Keys.TRACKER_NAME);
-      String trackerHostName = 
-        JobInProgress.convertTrackerNameToHostName(trackerName);
-      // recover the port information.
-      int port = 0; // default to 0
-      String hport = attempt.get(Keys.HTTP_PORT);
-      if (hport != null && hport.length() > 0) {
-        port = attempt.getInt(Keys.HTTP_PORT);
-      }
-      
-      long attemptStartTime = attempt.getLong(Keys.START_TIME);
-
-      // II. Create the (appropriate) task status
-      if (type.equals(Values.MAP.name())) {
-        taskStatus = 
-          new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
-                            TaskStatus.State.RUNNING, "", "", trackerName, 
-                            TaskStatus.Phase.MAP, new Counters());
-      } else {
-        taskStatus = 
-          new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE), 
-                               TaskStatus.State.RUNNING, "", "", trackerName, 
-                               TaskStatus.Phase.REDUCE, new Counters());
-      }
-
-      // Set the start time
-      taskStatus.setStartTime(attemptStartTime);
-
-      List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
-      ttStatusList.add(taskStatus);
-      
-      // III. Create the dummy tasktracker status
-      TaskTrackerStatus ttStatus = 
-        new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
-                              0 , 0, 0, 0);
-      ttStatus.setLastSeen(clock.getTime());
-
-      synchronized (JobTracker.this) {
-        synchronized (taskTrackers) {
-          synchronized (trackerExpiryQueue) {
-            // IV. Register a new tracker
-            TaskTracker taskTracker = getTaskTracker(trackerName);
-            boolean isTrackerRegistered =  (taskTracker != null);
-            if (!isTrackerRegistered) {
-              markTracker(trackerName); // add the tracker to recovery-manager
-              taskTracker = new TaskTracker(trackerName);
-              taskTracker.setStatus(ttStatus);
-              addNewTracker(taskTracker);
-            }
-      
-            // V. Update the tracker status
-            // This will update the meta info of the jobtracker and also add the
-            // tracker status if missing i.e register it
-            updateTaskTrackerStatus(trackerName, ttStatus);
-          }
-        }
-        // Register the attempt with job and tip, under JobTracker lock. 
-        // Since, as of today they are atomic through heartbeat.
-        // VI. Register the attempt
-        //   a) In the job
-        job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
-        //   b) In the tip
-        tip.updateStatus(taskStatus);
+      if (isJobNameValid(fileName) && isJobDirValid(JobID.forName(fileName))) {
+        recoveryManager.addJobForRecovery(JobID.forName(fileName));
+        shouldRecover = true; // enable actual recovery if num-files > 1
       }
-      
-      // VII. Make an entry in the launched tasks
-      expireLaunchingTasks.addNewTask(attemptId);
-    }
-    
-    private void addSuccessfulAttempt(JobInProgress job, 
-                                      TaskAttemptID attemptId, 
-                                      JobHistory.TaskAttempt attempt) {
-      // I. Get the required info
-      TaskID taskId = attemptId.getTaskID();
-      String type = attempt.get(Keys.TASK_TYPE);
-
-      TaskInProgress tip = job.getTaskInProgress(taskId);
-      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
-      // Get the task status and the tracker name and make a copy of it
-      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
-      taskStatus.setFinishTime(attemptFinishTime);
-
-      String stateString = attempt.get(Keys.STATE_STRING);
-
-      // Update the basic values
-      taskStatus.setStateString(stateString);
-      taskStatus.setProgress(1.0f);
-      taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-
-      // Set the shuffle/sort finished times
-      if (type.equals(Values.REDUCE.name())) {
-        long shuffleTime = 
-          Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
-        long sortTime = 
-          Long.parseLong(attempt.get(Keys.SORT_FINISHED));
-        taskStatus.setShuffleFinishTime(shuffleTime);
-        taskStatus.setSortFinishTime(sortTime);
-      }
-
-      // Add the counters
-      String counterString = attempt.get(Keys.COUNTERS);
-      Counters counter = null;
-      //TODO Check if an exception should be thrown
-      try {
-        counter = Counters.fromEscapedCompactString(counterString);
-      } catch (ParseException pe) { 
-        counter = new Counters(); // Set it to empty counter
-      }
-      taskStatus.setCounters(counter);
-      
-      synchronized (JobTracker.this) {
-        // II. Replay the status
-        job.updateTaskStatus(tip, taskStatus);
-      }
-      
-      // III. Prevent the task from expiry
-      expireLaunchingTasks.removeTask(attemptId);
     }
     
-    private void addUnsuccessfulAttempt(JobInProgress job,
-                                        TaskAttemptID attemptId,
-                                        JobHistory.TaskAttempt attempt) {
-      // I. Get the required info
-      TaskID taskId = attemptId.getTaskID();
-      TaskInProgress tip = job.getTaskInProgress(taskId);
-      long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
-
-      TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
-      taskStatus.setFinishTime(attemptFinishTime);
-
-      // Reset the progress
-      taskStatus.setProgress(0.0f);
-      
-      String stateString = attempt.get(Keys.STATE_STRING);
-      taskStatus.setStateString(stateString);
-
-      boolean hasFailed = 
-        attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
-      // Set the state failed/killed
-      if (hasFailed) {
-        taskStatus.setRunState(TaskStatus.State.FAILED);
+    private boolean isJobDirValid(JobID jobId) throws IOException {
+      boolean ret = false;
+      Path jobInfoFile = getSystemFileForJob(jobId);
+      final Path jobTokenFile = getTokenFileForJob(jobId);
+      JobConf job = new JobConf();
+      if (jobTokenFile.getFileSystem(job).exists(jobTokenFile)
+          && jobInfoFile.getFileSystem(job).exists(jobInfoFile)) {
+        ret = true;
       } else {
-        taskStatus.setRunState(TaskStatus.State.KILLED);
+        LOG.warn("Job " + jobId
+            + " does not have valid info/token file so ignoring for recovery");
       }
-
-      // Get/Set the error msg
-      String diagInfo = attempt.get(Keys.ERROR);
-      taskStatus.setDiagnosticInfo(diagInfo); // diag info
-
-      synchronized (JobTracker.this) {
-        // II. Update the task status
-        job.updateTaskStatus(tip, taskStatus);
-      }
-
-     // III. Prevent the task from expiry
-     expireLaunchingTasks.removeTask(attemptId);
+      return ret;
     }
   
     Path getRestartCountFile() {
@@ -1714,11 +1358,9 @@ public class JobTracker implements MRCon
         fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
       } else {
         // For the very first time the jobtracker will create a jobtracker.info
-        // file. If the jobtracker has restarted then disable recovery as files'
-        // needed for recovery are missing.
-
-        // disable recovery if this is a restart
-        shouldRecover = false;
+        // file.
+        // enable recovery if this is a restart
+        shouldRecover = true;
 
         // write the jobtracker.info file
         try {
@@ -1770,205 +1412,59 @@ public class JobTracker implements MRCon
       fs.rename(tmpRestartFile, restartFile);
     }
 
-                                   // mapred.JobID::forName returns
-    @SuppressWarnings("unchecked") // mapreduce.JobID
     public void recover() {
+      int recovered = 0;
+      long recoveryProcessStartTime = clock.getTime();
       if (!shouldRecover()) {
         // clean up jobs structure
         jobsToRecover.clear();
         return;
       }
 
-      LOG.info("Restart count of the jobtracker : " + restartCount);
-
-      // I. Init the jobs and cache the recovered job history filenames
-      Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
-      Iterator<JobID> idIter = jobsToRecover.iterator();
-      JobInProgress job = null;
-      File jobIdFile = null;
-
-      // 0. Cleanup
-      try {
-        JobHistory.JobInfo.deleteConfFiles();
-      } catch (IOException ioe) {
-        LOG.info("Error in cleaning up job history folder", ioe);
-      }
-
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        LOG.info("Trying to recover details of job " + id);
+      LOG.info("Starting the recovery process for " + jobsToRecover.size()
+          + " jobs ...");
+      for (JobID jobId : jobsToRecover) {
+        LOG.info("Submitting job " + jobId);
         try {
-          // 1. Recover job owner and create JIP
-          jobIdFile = 
-            new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());
-
-          String user = null;
-          if (jobIdFile != null && jobIdFile.exists()) {
-            LOG.info("File " + jobIdFile + " exists for job " + id);
-            FileInputStream in = new FileInputStream(jobIdFile);
-            BufferedReader reader = null;
-            try {
-              reader = new BufferedReader(new InputStreamReader(in));
-              user = reader.readLine();
-              LOG.info("Recovered user " + user + " for job " + id);
-            } finally {
-              if (reader != null) {
-                reader.close();
-              }
-              in.close();
-            }
-          }
-          if (user == null) {
-            throw new RuntimeException("Incomplete job " + id);
-          }
-
-          // Create the job
-          /* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE
-           * BACKPORTED (MAPREDUCE-873)
-           */
-          job = new JobInProgress(JobTracker.this, conf,
-              new JobInfo((org.apache.hadoop.mapreduce.JobID) id,
-                new Text(user), new Path(getStagingAreaDirInternal(user))),
-              restartCount, new Credentials() /*HACK*/);
-
-          // 2. Check if the user has appropriate access
-          // Get the user group info for the job's owner
-          UserGroupInformation ugi =
-            UserGroupInformation.createRemoteUser(job.getJobConf().getUser());
-          LOG.info("Submitting job " + id + " on behalf of user "
-                   + ugi.getShortUserName() + " in groups : "
-                   + StringUtils.arrayToString(ugi.getGroupNames()));
-
-          // check the access
-          try {
-            aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
-          } catch (Throwable t) {
-            LOG.warn("Access denied for user " + ugi.getShortUserName() 
-                     + " in groups : [" 
-                     + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
-            throw t;
-          }
-
-          // 3. Get the log file and the file path
-          String logFileName = 
-            JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
-          if (logFileName != null) {
-            Path jobHistoryFilePath = 
-              JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
-
-            // 4. Recover the history file. This involved
-            //     - deleting file.recover if file exists
-            //     - renaming file.recover to file if file doesnt exist
-            // This makes sure that the (master) file exists
-            JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
-                                                     jobHistoryFilePath);
+          Path jobInfoFile = getSystemFileForJob(jobId);
+          final Path jobTokenFile = getTokenFileForJob(jobId);
+          FSDataInputStream in = fs.open(jobInfoFile);
+          final JobInfo token = new JobInfo();
+          token.readFields(in);
+          in.close();
           
-            // 5. Cache the history file name as it costs one dfs access
-            jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
+          // Read tokens as JT user
+          JobConf job = new JobConf();
+          final Credentials ts = 
+              (jobTokenFile.getFileSystem(job).exists(jobTokenFile)) ?
+            Credentials.readTokenStorageFile(jobTokenFile, job) : null;
+
+          // Re-submit job  
+          final UserGroupInformation ugi = UserGroupInformation
+              .createRemoteUser(token.getUser().toString());
+          JobStatus status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
+            public JobStatus run() throws IOException, InterruptedException {
+              return submitJob(JobID.downgrade(token.getJobID()), token
+                  .getJobSubmitDir().toString(), ugi, ts, true);
+            }
+          });
+          if (status == null) {
+            LOG.info("Job " + jobId + " was not recovered since it " +
+              "disabled recovery on restart (" + JobConf.MAPREDUCE_RECOVER_JOB +
+              " set to 'false').");
           } else {
-            LOG.info("No history file found for job " + id);
-            idIter.remove(); // remove from recovery list
-          }
-
-          // 6. Sumbit the job to the jobtracker
-          addJob(id, job);
-        } catch (Throwable t) {
-          LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
-          idIter.remove();
-          if (jobIdFile != null) {
-            jobIdFile.delete();
-            jobIdFile = null;
-          }
-          if (job != null) {
-            job.fail();
-            job = null;
-          }
-          continue;
-        }
-      }
-
-      long recoveryStartTime = clock.getTime();
-
-      // II. Recover each job
-      idIter = jobsToRecover.iterator();
-      while (idIter.hasNext()) {
-        JobID id = idIter.next();
-        JobInProgress pJob = getJob(id);
-
-        // 1. Get the required info
-        // Get the recovered history file
-        Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
-        String logFileName = jobHistoryFilePath.getName();
-
-        FileSystem fs;
-        try {
-          fs = jobHistoryFilePath.getFileSystem(conf);
-        } catch (IOException ioe) {
-          LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
-                   ioe);
-          continue;
-        }
-
-        // 2. Parse the history file
-        // Note that this also involves job update
-        JobRecoveryListener listener = new JobRecoveryListener(pJob);
-        try {
-          JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
-                                        listener, fs);
-        } catch (Throwable t) {
-          LOG.info("Error reading history file of job " + pJob.getJobID() 
-                   + ". Ignoring the error and continuing.", t);
-        }
-
-        // 3. Close the listener
-        listener.close();
-        
-        // 4. Update the recovery metric
-        totalEventsRecovered += listener.getNumEventsRecovered();
-
-        // 5. Cleanup history
-        // Delete the master log file as an indication that the new file
-        // should be used in future
-        try {
-          synchronized (pJob) {
-            JobHistory.JobInfo.checkpointRecovery(logFileName, 
-                                                  pJob.getJobConf());
+            recovered++;
           }
-        } catch (Throwable t) {
-          LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
-                   + id + ". Continuing.", t);
-        }
-
-        if (pJob.isComplete()) {
-          idIter.remove(); // no need to keep this job info as its successful
+        } catch (Exception e) {
+          LOG.warn("Could not recover job " + jobId, e);
         }
       }
-
-      recoveryDuration = clock.getTime() - recoveryStartTime;
+      recoveryDuration = clock.getTime() - recoveryProcessStartTime;
       hasRecovered = true;
 
-      // III. Finalize the recovery
-      synchronized (trackerExpiryQueue) {
-        // Make sure that the tracker statuses in the expiry-tracker queue
-        // are updated
-        long now = clock.getTime();
-        int size = trackerExpiryQueue.size();
-        for (int i = 0; i < size ; ++i) {
-          // Get the first tasktracker
-          TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
-
-          // Remove it
-          trackerExpiryQueue.remove(taskTracker);
-
-          // Set the new time
-          taskTracker.setLastSeen(now);
-
-          // Add back to get the sorted list
-          trackerExpiryQueue.add(taskTracker);
-        }
-      }
-
-      LOG.info("Restoration complete");
+      LOG.info("Recovery done! Recoverd " + recovered + " of "
+          + jobsToRecover.size() + " jobs.");
+      LOG.info("Recovery Duration (ms):" + recoveryDuration);
     }
     
     int totalEventsRecovered() {
@@ -2193,8 +1689,183 @@ public class JobTracker implements MRCon
     this(conf, identifier, clock, new QueueManager(new Configuration(conf)));
   } 
   
+  private void initJTConf(JobConf conf) {
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false)) {
+      LOG.warn(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY + 
+          " is enabled, disabling it");
+      conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false);
+    }
+  }
+ 
+  @InterfaceAudience.Private
+  void initializeFilesystem() throws IOException, InterruptedException {
+    // Connect to HDFS NameNode
+    while (!Thread.currentThread().isInterrupted() && fs == null) {
+      try {
+        fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
+          public FileSystem run() throws IOException {
+            Path systemDir = new Path(conf.get("mapred.system.dir",
+              "/tmp/hadoop/mapred/system"));
+            return FileSystem.get(systemDir.toUri(), conf);
+          }});
+      } catch (IOException ie) {
+        fs = null;
+        LOG.info("Problem connecting to HDFS Namenode... re-trying", ie);
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+      }
+    }
+    
+    if (Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException();
+    }
+    
+    // Ensure HDFS is healthy
+    if ("hdfs".equalsIgnoreCase(fs.getUri().getScheme())) {
+      while (!DistributedFileSystem.isHealthy(fs.getUri())) {
+        LOG.info("HDFS initialized but not 'healthy' yet, waiting...");
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+      }
+    }
+  }
+  
+  @InterfaceAudience.Private
+  void initialize() 
+      throws IOException, InterruptedException {
+    // initialize history parameters.
+    final JobTracker jtFinal = this;
+    
+    getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        JobHistory.init(jtFinal, conf, jtFinal.localMachine, 
+            jtFinal.startTime);
+        return true;
+      }
+    });
+
+    // start the recovery manager
+    recoveryManager = new RecoveryManager();
+    
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        // if we haven't contacted the namenode go ahead and do it
+        UserGroupInformation mrOwner = getMROwner();
+        // clean up the system dir, which will only work if hdfs is out of 
+        // safe mode
+        if(systemDir == null) {
+          systemDir = new Path(getSystemDir());    
+        }
+        try {
+          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
+          if (!systemDirStatus.isOwnedByUser(
+                   mrOwner.getShortUserName(), mrOwner.getGroupNames())) {
+            throw new AccessControlException("The systemdir " + systemDir +
+                " is not owned by " + mrOwner.getShortUserName());
+          }
+          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
+            LOG.warn("Incorrect permissions on " + systemDir +
+                ". Setting it to " + SYSTEM_DIR_PERMISSION);
+            fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
+          }
+        } catch (FileNotFoundException fnf) {} //ignore
+        // Make sure that the backup data is preserved
+        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
+        // Check if the history is enabled .. as we cant have persistence with 
+        // history disabled
+        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
+            && systemDirData != null) {
+          for (FileStatus status : systemDirData) {
+            try {
+              recoveryManager.checkAndAddJob(status);
+            } catch (Throwable t) {
+              LOG.warn("Failed to add the job " + status.getPath().getName(), 
+                       t);
+            }
+          }
+          
+          // Check if there are jobs to be recovered
+          hasRestarted = recoveryManager.shouldRecover();
+          if (hasRestarted) {
+            break; // if there is something to recover else clean the sys dir
+          }
+        }
+        LOG.info("Cleaning up the system directory");
+        fs.delete(systemDir, true);
+        if (FileSystem.mkdirs(fs, systemDir, 
+            new FsPermission(SYSTEM_DIR_PERMISSION))) {
+          break;
+        }
+        LOG.error("Mkdirs failed to create " + systemDir);
+      } catch (AccessControlException ace) {
+        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
+                 + ") because of permissions.");
+        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
+                 + ") and then start the JobTracker.");
+        LOG.warn("Bailing out ... ", ace);
+        throw ace;
+      } catch (IOException ie) {
+        LOG.info("problem cleaning system directory: " + systemDir, ie);
+      }
+      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+    }
+    
+    if (Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException();
+    }
+    
+    // Same with 'localDir' except it's always on the local disk.
+    if (!hasRestarted) {
+      conf.deleteLocalFiles(SUBDIR);
+    }
+
+    // Initialize history DONE folder
+    FileSystem historyFS = getMROwner().doAs(
+        new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws IOException {
+        JobHistory.initDone(conf, fs);
+        final String historyLogDir = 
+          JobHistory.getCompletedJobHistoryLocation().toString();
+        infoServer.setAttribute("historyLogDir", historyLogDir);
+
+        infoServer.setAttribute
+          ("serialNumberDirectoryDigits",
+           Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
+
+        infoServer.setAttribute
+          ("serialNumberTotalDigits",
+           Integer.valueOf(JobHistory.serialNumberTotalDigits()));
+        
+        return new Path(historyLogDir).getFileSystem(conf);
+      }
+    });
+    infoServer.setAttribute("fileSys", historyFS);
+    infoServer.setAttribute("jobConf", conf);
+    infoServer.setAttribute("aclManager", aclsManager);
+
+    if (JobHistoryServer.isEmbedded(conf)) {
+      LOG.info("History server being initialized in embedded mode");
+      jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
+      jobHistoryServer.start();
+      LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
+    }
+
+    //initializes the job status store
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
+    
+    // Setup HDFS monitoring
+    if (this.conf.getBoolean(
+        JT_HDFS_MONITOR_ENABLE, DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE)) {
+      hdfsMonitor = new HDFSMonitorThread(this.conf, this, this.fs);
+      hdfsMonitor.start();
+    }
+  }
+  
   JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) 
-  throws IOException, InterruptedException { 
+  throws IOException, InterruptedException {
+    
+    initJTConf(conf);
+    
     this.queueManager = qm;
     this.clock = clock;
     // Set ports, start RPC servers, setup security policy etc.
@@ -2291,6 +1962,11 @@ public class JobTracker implements MRCon
     LOG.info("Starting jobtracker with owner as " +
         getMROwner().getShortUserName());
 
+    // Create network topology
+    clusterMap = (NetworkTopology) ReflectionUtils.newInstance(
+            conf.getClass("net.topology.impl", NetworkTopology.class,
+                NetworkTopology.class), conf);
+
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
       = conf.getClass("mapred.jobtracker.taskScheduler",
@@ -2300,7 +1976,12 @@ public class JobTracker implements MRCon
     // Set service-level authorization security policy
     if (conf.getBoolean(
           ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
+      PolicyProvider policyProvider = 
+          (PolicyProvider)(ReflectionUtils.newInstance(
+              conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                  MapReducePolicyProvider.class, PolicyProvider.class), 
+              conf));
+        ServiceAuthorizationManager.refresh(conf, policyProvider);
     }
     
     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
@@ -2327,16 +2008,6 @@ public class JobTracker implements MRCon
     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
         tmpInfoPort == 0, conf, aclsManager.getAdminsAcl());
     infoServer.setAttribute("job.tracker", this);
-    // initialize history parameters.
-    final JobTracker jtFinal = this;
-    getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
-      @Override
-      public Boolean run() throws Exception {
-        JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
-            jtFinal.startTime);
-        return true;
-      }
-    });
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
@@ -2356,128 +2027,27 @@ public class JobTracker implements MRCon
         infoBindAddress + ":" + this.infoPort); 
     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
     
-    // start the recovery manager
-    recoveryManager = new RecoveryManager();
-    
-    while (!Thread.currentThread().isInterrupted()) {
-      try {
-        // if we haven't contacted the namenode go ahead and do it
-        UserGroupInformation mrOwner = getMROwner();
-        if (fs == null) {
-          fs = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
-            public FileSystem run() throws IOException {
-              Path systemDir = new Path(conf.get("mapred.system.dir",
-                "/tmp/hadoop/mapred/system"));
-              return FileSystem.get(systemDir.toUri(), conf);
-          }});
-        }
-        // clean up the system dir, which will only work if hdfs is out of 
-        // safe mode
-        if(systemDir == null) {
-          systemDir = new Path(getSystemDir());    
-        }
-        try {
-          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
-          if (!systemDirStatus.isOwnedByUser(
-                   mrOwner.getShortUserName(), mrOwner.getGroupNames())) {
-            throw new AccessControlException("The systemdir " + systemDir +
-                " is not owned by " + mrOwner.getShortUserName());
-          }
-          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
-            LOG.warn("Incorrect permissions on " + systemDir +
-                ". Setting it to " + SYSTEM_DIR_PERMISSION);
-            fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
-          }
-        } catch (FileNotFoundException fnf) {} //ignore
-        // Make sure that the backup data is preserved
-        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
-        // Check if the history is enabled .. as we cant have persistence with 
-        // history disabled
-        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
-            && systemDirData != null) {
-          for (FileStatus status : systemDirData) {
-            try {
-              recoveryManager.checkAndAddJob(status);
-            } catch (Throwable t) {
-              LOG.warn("Failed to add the job " + status.getPath().getName(), 
-                       t);
-            }
-          }
-          
-          // Check if there are jobs to be recovered
-          hasRestarted = recoveryManager.shouldRecover();
-          if (hasRestarted) {
-            break; // if there is something to recover else clean the sys dir
-          }
-        }
-        LOG.info("Cleaning up the system directory");
-        fs.delete(systemDir, true);
-        if (FileSystem.mkdirs(fs, systemDir, 
-            new FsPermission(SYSTEM_DIR_PERMISSION))) {
-          break;
-        }
-        LOG.error("Mkdirs failed to create " + systemDir);
-      } catch (AccessControlException ace) {
-        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
-                 + ") because of permissions.");
-        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
-                 + ") and then start the JobTracker.");
-        LOG.warn("Bailing out ... ", ace);
-        throw ace;
-      } catch (IOException ie) {
-        LOG.info("problem cleaning system directory: " + systemDir, ie);
-      }
-      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
-    }
-    
-    if (Thread.currentThread().isInterrupted()) {
-      throw new InterruptedException();
-    }
-    
-    // Same with 'localDir' except it's always on the local disk.
-    if (!hasRestarted) {
-      jobConf.deleteLocalFiles(SUBDIR);
-    }
-
-    // Initialize history DONE folder
-    FileSystem historyFS = getMROwner().doAs(
-        new PrivilegedExceptionAction<FileSystem>() {
-      public FileSystem run() throws IOException {
-        JobHistory.initDone(conf, fs);
-        final String historyLogDir = 
-          JobHistory.getCompletedJobHistoryLocation().toString();
-        infoServer.setAttribute("historyLogDir", historyLogDir);
-
-        infoServer.setAttribute
-          ("serialNumberDirectoryDigits",
-           Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
-
-        infoServer.setAttribute
-          ("serialNumberTotalDigits",
-           Integer.valueOf(JobHistory.serialNumberTotalDigits()));
-        
-        return new Path(historyLogDir).getFileSystem(conf);
-      }
-    });
-    infoServer.setAttribute("fileSys", historyFS);
-    infoServer.setAttribute("jobConf", conf);
-    infoServer.setAttribute("aclManager", aclsManager);
-
-    if (JobHistoryServer.isEmbedded(conf)) {
-      LOG.info("History server being initialized in embedded mode");
-      jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
-      jobHistoryServer.start();
-      LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
-    }
-
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
-
-    //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
+    this.isNodeGroupAware = conf.getBoolean(
+            "mapred.jobtracker.nodegroup.aware", false);
+    
+    plugins = conf.getInstances("mapreduce.jobtracker.plugins",
+        ServicePlugin.class);
+    for (ServicePlugin p : plugins) {
+      try {
+        p.start(this);
+        LOG.info("Started plug-in " + p + " of type " + p.getClass());
+      } catch (Throwable t) {
+        LOG.warn("ServicePlugin " + p + " of type " + p.getClass()
+            + " could not be started", t);
+      }
+    }
+    
+    this.initDone.set(conf.getBoolean(JT_INIT_CONFIG_KEY_FOR_TESTS, true));
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -2568,6 +2138,17 @@ public class JobTracker implements MRCon
    * Run forever
    */
   public void offerService() throws InterruptedException, IOException {
+    // start the inter-tracker server 
+    this.interTrackerServer.start();
+    
+    // Initialize the JobTracker FileSystem within safemode
+    setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
+    initializeFilesystem();
+    setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
+    
+    // Initialize JobTracker
+    initialize();
+    
     // Prepare for recovery. This is done irrespective of the status of restart
     // flag.
     while (true) {
@@ -2607,19 +2188,53 @@ public class JobTracker implements MRCon
       completedJobsStoreThread.start();
     }
 
-    // start the inter-tracker server once the jt is ready
-    this.interTrackerServer.start();
-    
+    // Just for unit-tests 
+    waitForInit();
     synchronized (this) {
       state = State.RUNNING;
     }
+    
     LOG.info("Starting RUNNING");
     
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
 
+  AtomicBoolean initDone = new AtomicBoolean(true);
+  Object initDoneLock = new Object();
+  
+  private void waitForInit() {
+    synchronized (initDoneLock) {
+      while (!initDone.get()) {
+        try {
+          LOG.debug("About to wait since initDone = false");
+          initDoneLock.wait();
+        } catch (InterruptedException ie) {
+          LOG.debug("Ignoring ", ie);
+        }
+      }
+    }
+  }
+  
+  void setInitDone(boolean done) {
+    synchronized (initDoneLock) {
+      initDone.set(done);
+      initDoneLock.notify();
+    }
+  }
+  
   void close() throws IOException {
+    if (plugins != null) {
+      for (ServicePlugin p : plugins) {
+        try {
+          p.stop();
+          LOG.info("Stopped plug-in " + p + " of type " + p.getClass());
+        } catch (Throwable t) {
+          LOG.warn("ServicePlugin " + p + " of type " + p.getClass()
+              + " could not be stopped", t);
+        }
+      }
+    }
     if (this.infoServer != null) {
       LOG.info("Stopping infoServer");
       try {
@@ -3267,7 +2882,9 @@ public class JobTracker implements MRCon
   public int getNumberOfUniqueHosts() {
     return uniqueHostsMap.size();
   }
-  
+  public boolean isNodeGroupAware() {
+    return isNodeGroupAware;
+  }
   public void addJobInProgressListener(JobInProgressListener listener) {
     jobInProgressListeners.add(listener);
   }
@@ -3294,8 +2911,13 @@ public class JobTracker implements MRCon
   ////////////////////////////////////////////////////
   // InterTrackerProtocol
   ////////////////////////////////////////////////////
-  
-  public String getBuildVersion() throws IOException{
+
+  // Just returns the VersionInfo version (unlike MXBean#getVersion)
+  public String getVIVersion() throws IOException {
+    return VersionInfo.getVersion();
+  }
+
+  public String getBuildVersion() throws IOException {
     return VersionInfo.getBuildVersion();
   }
 
@@ -3412,7 +3034,7 @@ public class JobTracker implements MRCon
         }
       }
     }
-      
+
     // Check for tasks to be killed
     List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
     if (killTasksList != null) {
@@ -3461,7 +3083,7 @@ public class JobTracker implements MRCon
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
                                 (int)(1000 * HEARTBEATS_SCALING_FACTOR *
-                                      Math.ceil((double)clusterSize / 
+                                      ((double)clusterSize / 
                                                 NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
@@ -3807,6 +3429,12 @@ public class JobTracker implements MRCon
   // returns cleanup tasks first, then setup tasks.
   synchronized List<Task> getSetupAndCleanupTasks(
     TaskTrackerStatus taskTracker) throws IOException {
+    
+    // Don't assign *any* new task in safemode
+    if (isInSafeMode()) {
+      return null;
+    }
+    
     int maxMapTasks = taskTracker.getMaxMapSlots();
     int maxReduceTasks = taskTracker.getMaxReduceSlots();
     int numMaps = taskTracker.countOccupiedMapSlots();
@@ -3914,9 +3542,12 @@ public class JobTracker implements MRCon
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
-
+  
   /**
    * JobTracker.submitJob() kicks off a new job.  
    *
@@ -3927,8 +3558,32 @@ public class JobTracker implements MRCon
    */
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
+    return submitJob(jobId, jobSubmitDir, null, ts, false);
+  }
+
+  /**
+   * JobTracker.submitJob() kicks off a new job.
+   * 
+   * Create a 'JobInProgress' object, which contains both JobProfile and
+   * JobStatus. Those two sub-objects are sometimes shipped outside of the
+   * JobTracker. But JobInProgress adds info that's useful for the JobTracker
+   * alone.
+   * @return null if the job is being recovered but mapred.job.restart.recover
+   * is false.
+   */
+  JobStatus submitJob(JobID jobId, String jobSubmitDir,
+      UserGroupInformation ugi, Credentials ts, boolean recovered)
+      throws IOException {
+    // Check for safe-mode
+    checkSafeMode();
+    
     JobInfo jobInfo = null;
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    if (ugi == null) {
+      ugi = UserGroupInformation.getCurrentUser();
+    }
     synchronized (this) {
       if (jobs.containsKey(jobId)) {
         // job already running, don't start twice
@@ -3938,15 +3593,40 @@ public class JobTracker implements MRCon
           new Path(jobSubmitDir));
     }
     
+    // Store the job-info in a file so that the job can be recovered
+    // later (if at all)
+    // Note: jobDir & jobInfo are owned by JT user since we are using
+    // his fs object
+    if (!recovered) {
+      Path jobDir = getSystemDirectoryForJob(jobId);
+      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+      jobInfo.write(out);
+      out.close();
+    }
+
     // Create the JobInProgress, do not lock the JobTracker since
-    // we are about to copy job.xml from HDFS
+    // we are about to copy job.xml from HDFS and write jobToken file to HDFS
     JobInProgress job = null;
     try {
+      if (ts == null) {
+        ts = new Credentials();
+      }
+      generateAndStoreJobTokens(jobId, ts);
       job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
     } catch (Exception e) {
       throw new IOException(e);
     }
     
+    if (recovered && 
+        !job.getJobConf().getBoolean(
+            JobConf.MAPREDUCE_RECOVER_JOB, 
+            JobConf.DEFAULT_MAPREDUCE_RECOVER_JOB)) {
+      LOG.info("Job "+ jobId.toString() + " is not enable for recovery, cleaning up job files");
+      job.cleanupJob();
+      return null;
+    }
+    
     synchronized (this) {
       // check if queue is RUNNING
       String queue = job.getProfile().getQueueName();
@@ -3969,19 +3649,6 @@ public class JobTracker implements MRCon
       } catch (IOException ioe) {
         throw ioe;
       }
-      boolean recovered = true; // TODO: Once the Job recovery code is there,
-      // (MAPREDUCE-873) we
-      // must pass the "recovered" flag accurately.
-      // This is handled in the trunk/0.22
-      if (!recovered) {
-        // Store the information in a file so that the job can be recovered
-        // later (if at all)
-        Path jobDir = getSystemDirectoryForJob(jobId);
-        FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
-        FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
-        jobInfo.write(out);
-        out.close();
-      }
 
       try {
         this.taskScheduler.checkJobSubmission(job);
@@ -4001,7 +3668,6 @@ public class JobTracker implements MRCon
         failJob(job);
         throw ioe;
       }
-      
       return status;
     }
   }
@@ -4010,6 +3676,9 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
+    // Check for safe-mode
+    checkSafeMode();
+
     try{
       final String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
@@ -4126,6 +3795,12 @@ public class JobTracker implements MRCon
       return;
     }
     
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
+    // No 'killJob' in safe-mode
+    checkSafeMode();
+    
     JobInProgress job = jobs.get(jobid);
     
     if (null == job) {
@@ -4269,6 +3944,9 @@ public class JobTracker implements MRCon
   public synchronized void setJobPriority(JobID jobid, 
                                           String priority)
                                           throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (null == job) {
         LOG.info("setJobPriority(): JobId " + jobid.toString()
@@ -4296,7 +3974,10 @@ public class JobTracker implements MRCon
     return job.inited(); 
   }
   
-  public JobProfile getJobProfile(JobID jobid) {
+  public JobProfile getJobProfile(JobID jobid) throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
@@ -4313,7 +3994,10 @@ public class JobTracker implements MRCon
     return completedJobStatusStore.readJobProfile(jobid);
   }
   
-  public JobStatus getJobStatus(JobID jobid) {
+  public JobStatus getJobStatus(JobID jobid) throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       return null;
@@ -4336,6 +4020,9 @@ public class JobTracker implements MRCon
   
   private static final Counters EMPTY_COUNTERS = new Counters();
   public Counters getJobCounters(JobID jobid) throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
@@ -4370,6 +4057,9 @@ public class JobTracker implements MRCon
   
   public synchronized TaskReport[] getMapTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4398,6 +4088,9 @@ public class JobTracker implements MRCon
 
   public synchronized TaskReport[] getReduceTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4424,6 +4117,9 @@ public class JobTracker implements MRCon
 
   public synchronized TaskReport[] getCleanupTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4453,6 +4149,9 @@ public class JobTracker implements MRCon
   
   public synchronized TaskReport[] getSetupTaskReports(JobID jobid)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
@@ -4496,6 +4195,9 @@ public class JobTracker implements MRCon
    */
   public TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     JobInProgress job = this.jobs.get(jobid);
       
     if (null != job) {
@@ -4517,6 +4219,9 @@ public class JobTracker implements MRCon
    */
   public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)  
     throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
     List<String> taskDiagnosticInfo = null;
     JobID jobId = taskId.getJobID();
     TaskID tipId = taskId.getTaskID();
@@ -4581,6 +4286,12 @@ public class JobTracker implements MRCon
    */
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail)
       throws IOException {
+    // Check for JobTracker operational state
+    checkJobTrackerState();
+    
+    // No 'killTask' in safe-mode
+    checkSafeMode();
+
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
       // check both queue-level and job-level access
@@ -4622,6 +4333,11 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
+    // Might not be initialized yet, TT handles this
+    if (isInSafeMode()) {
+      return null;
+    }
+    
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }
@@ -4652,9 +4368,15 @@ public class JobTracker implements MRCon
   
   //Get the job token file in system directory
   Path getSystemFileForJob(JobID id) {
-    return new Path(getSystemDirectoryForJob(id)+"/" + JOB_INFO_FILE);
+    return new Path(getSystemDirectoryForJob(id), JOB_INFO_FILE);
   }
 
+  //Get the job token file in system directory
+  Path getTokenFileForJob(JobID id) {
+    return new Path(
+        getSystemDirectoryForJob(id), TokenCache.JOB_TOKEN_HDFS_FILE);
+  }
+  
   /**
    * Change the run-time priority of the given job.
    * 
@@ -4702,8 +4424,10 @@ public class JobTracker implements MRCon
       report.setTaskTracker(trackerName);
       TaskAttemptID taskId = report.getTaskID();
       
-      // expire it
-      expireLaunchingTasks.removeTask(taskId);
+      // don't expire the task if it is not unassigned
+      if (report.getRunState() != TaskStatus.State.UNASSIGNED) {
+        expireLaunchingTasks.removeTask(taskId);
+      }
       
       JobInProgress job = getJob(taskId.getJobID());
       if (job == null) {
@@ -5385,4 +5109,145 @@ public class JobTracker implements MRCon
     return map;
   }
   // End MXbean implementaiton
+
+  /**
+   * JobTracker SafeMode
+   */
+  // SafeMode actions
+  public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+  
+  private AtomicBoolean safeMode = new AtomicBoolean(false);
+  private AtomicBoolean adminSafeMode = new AtomicBoolean(false);
+  private String adminSafeModeUser = "";
+  
+  public boolean setSafeMode(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    // Anyone can check JT safe-mode
+    if (safeModeAction == SafeModeAction.SAFEMODE_GET) {
+      boolean safeMode = this.safeMode.get();
+      LOG.info("Getting safemode information: safemode=" + safeMode + ". " +
+          "Requested by : " +
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      AuditLogger.logSuccess(user, Constants.GET_SAFEMODE, 
+          Constants.JOBTRACKER);
+      return safeMode;
+    }
+    
+    // Check access for modifications to safe-mode
+    if (!aclsManager.isMRAdmin(UserGroupInformation.getCurrentUser())) {
+      AuditLogger.logFailure(user, Constants.SET_SAFEMODE, 
+          aclsManager.getAdminsAcl().toString(), Constants.JOBTRACKER, 
+          Constants.UNAUTHORIZED_USER);
+      throw new AccessControlException(user + 
+                                       " is not authorized to set " +
+                                       " JobTracker safemode.");
+    }
+    AuditLogger.logSuccess(user, Constants.SET_SAFEMODE, Constants.JOBTRACKER);
+
+    boolean currSafeMode = setSafeModeInternal(safeModeAction);
+    adminSafeMode.set(currSafeMode);
+    adminSafeModeUser = user;
+    return currSafeMode;
+  }
+  
+  boolean isInAdminSafeMode() {
+    return adminSafeMode.get();
+  }
+  
+  boolean setSafeModeInternal(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException {
+    if (safeModeAction != SafeModeAction.SAFEMODE_GET) {
+      boolean safeMode = false;
+      if (safeModeAction == SafeModeAction.SAFEMODE_ENTER) {
+        safeMode = true;
+      } else if (safeModeAction == SafeModeAction.SAFEMODE_LEAVE) {
+        safeMode = false;
+      }
+      LOG.info("Setting safe mode to " + safeMode + ". Requested by : " +
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      this.safeMode.set(safeMode);
+    }
+    return this.safeMode.get();
+  }
+
+  public boolean isInSafeMode() {
+    return safeMode.get();
+  }
+  
+  String getSafeModeText() {
+    if (!isInSafeMode())
+      return "OFF";
+    String safeModeInfo = 
+        adminSafeMode.get() ? 
+            "Set by admin <strong>" + adminSafeModeUser + "</strong>": 
+            "HDFS unavailable";
+    return "<em>ON - " + safeModeInfo + "</em>";
+  }
+  
+  private void checkSafeMode() throws SafeModeException {
+    if (isInSafeMode()) {
+      SafeModeException sme = 
+          new SafeModeException(
+              (isInAdminSafeMode()) ? adminSafeModeUser : null);
+      LOG.info("JobTracker in safe-mode, aborting operation: ", sme); 
+      throw sme;
+    }
+  }
+  
+  private void checkJobTrackerState() 
+      throws JobTrackerNotYetInitializedException {
+    if (state != State.RUNNING) {
+      JobTrackerNotYetInitializedException jtnyie =
+          new JobTrackerNotYetInitializedException();
+      LOG.info("JobTracker not yet in RUNNING state, aborting operation: ", 
+          jtnyie); 
+      throw jtnyie;
+    }
+  }
+
+  /**
+   * generate job token and save it into the file
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  private void 
+  generateAndStoreJobTokens(final JobID jobId, final Credentials tokenStorage) 
+      throws IOException {
+
+    // Write out jobToken as JT user
+    try {
+      getMROwner().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws IOException {
+
+          Path jobDir = getSystemDirectoryForJob(jobId);
+          Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
+          //create JobToken file and write token to it
+          JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
+              .toString()));
+          Token<JobTokenIdentifier> token = 
+              new Token<JobTokenIdentifier>(
+                  identifier, getJobTokenSecretManager());
+          token.setService(identifier.getJobId());
+
+          TokenCache.setJobToken(token, tokenStorage);
+
+          // write TokenStorage out
+          tokenStorage.writeTokenStorageFile(keysFile, getConf());
+          LOG.info("jobToken generated and stored with users keys in "
+              + keysFile.toUri().getPath());
+          
+          return null;
+
+        }
+      });
+    } catch (InterruptedException ie) {
+      // TODO Auto-generated catch block
+      throw new IOException(ie);
+    }
+
+  }
+
 }

Added: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java (added)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when the JobTracker is still initializing and
+ * not yet operational.
+ */
+public class JobTrackerNotYetInitializedException extends IOException {
+
+  private static final long serialVersionUID = 1984839357L;
+
+  public JobTrackerNotYetInitializedException() {
+    super("JobTracker is not yet RUNNING");
+  }
+
+}

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Fri Jun 21 06:37:27 2013
@@ -101,7 +101,7 @@ public class KeyValueLineRecordReader im
     return true;
   }
   
-  public float getProgress() {
+  public float getProgress() throws IOException {
     return lineRecordReader.getProgress();
   }
   

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -23,7 +23,9 @@ import java.io.IOException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 
 /**
  * An {@link InputFormat} for plain text files. Files are broken into lines.
@@ -41,9 +43,13 @@ public class KeyValueTextInputFormat ext
   }
   
   protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
-  
+
   public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,
                                                   JobConf job,
                                                   Reporter reporter)

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java Fri Jun 21 06:37:27 2013
@@ -25,10 +25,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -45,6 +50,9 @@ public class LineRecordReader implements
   private long end;
   private LineReader in;
   int maxLineLength;
+  private Seekable filePosition;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
 
   /**
    * A class that provides a line reader from an input stream.
@@ -71,37 +79,69 @@ public class LineRecordReader implements
     end = start + split.getLength();
     final Path file = split.getPath();
     compressionCodecs = new CompressionCodecFactory(job);
-    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
-    boolean skipFirstLine = false;
-    if (codec != null) {
-      in = new LineReader(codec.createInputStream(fileIn), job);
-      end = Long.MAX_VALUE;
-    } else {
-      if (start != 0) {
-        skipFirstLine = true;
-        --start;
-        fileIn.seek(start);
+
+    if (isCompressedInput()) {
+      decompressor = CodecPool.getDecompressor(codec);
+      if (codec instanceof SplittableCompressionCodec) {
+        final SplitCompressionInputStream cIn =
+          ((SplittableCompressionCodec)codec).createInputStream(
+            fileIn, decompressor, start, end,
+            SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        in = new LineReader(cIn, job);
+        start = cIn.getAdjustedStart();
+        end = cIn.getAdjustedEnd();
+        filePosition = cIn; // take pos from compressed stream
+      } else {
+        in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+        filePosition = fileIn;
       }
+    } else {
+      fileIn.seek(start);
       in = new LineReader(fileIn, job);
+      filePosition = fileIn;
     }
-    if (skipFirstLine) {  // skip first line and re-establish "start".
-      start += in.readLine(new Text(), 0,
-                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
+    // If this is not the first split, we always throw away first record
+    // because we always (except the last split) read one extra line in
+    // next() method.
+    if (start != 0) {
+      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
     }
     this.pos = start;
   }
-  
+
+  private boolean isCompressedInput() {
+    return (codec != null);
+  }
+
+  private int maxBytesToConsume(long pos) {
+    return isCompressedInput()
+      ? Integer.MAX_VALUE
+      : (int) Math.min(Integer.MAX_VALUE, end - pos);
+  }
+
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput() && null != filePosition) {
+      retVal = filePosition.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
   public LineRecordReader(InputStream in, long offset, long endOffset,
                           int maxLineLength) {
     this.maxLineLength = maxLineLength;
     this.in = new LineReader(in);
     this.start = offset;
     this.pos = offset;
-    this.end = endOffset;    
+    this.end = endOffset;
+    this.filePosition = null;
   }
 
   public LineRecordReader(InputStream in, long offset, long endOffset, 
@@ -113,6 +153,7 @@ public class LineRecordReader implements
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
+    this.filePosition = null;
   }
   
   public LongWritable createKey() {
@@ -127,12 +168,13 @@ public class LineRecordReader implements
   public synchronized boolean next(LongWritable key, Text value)
     throws IOException {
 
-    while (pos < end) {
+    // We always read one extra line, which lies outside the upper
+    // split limit i.e. (end - 1)
+    while (getFilePosition() <= end) {
       key.set(pos);
 
       int newSize = in.readLine(value, maxLineLength,
-                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
-                                         maxLineLength));
+          Math.max(maxBytesToConsume(pos), maxLineLength));
       if (newSize == 0) {
         return false;
       }
@@ -151,21 +193,28 @@ public class LineRecordReader implements
   /**
    * Get the progress within the split
    */
-  public float getProgress() {
+  public float getProgress() throws IOException {
     if (start == end) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (pos - start) / (float)(end - start));
+      return Math.min(1.0f,
+        (getFilePosition() - start) / (float)(end - start));
     }
   }
   
-  public  synchronized long getPos() throws IOException {
+  public synchronized long getPos() throws IOException {
     return pos;
   }
 
   public synchronized void close() throws IOException {
-    if (in != null) {
-      in.close(); 
+    try {
+      if (in != null) {
+        in.close();
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
     }
   }
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Jun 21 06:37:27 2013
@@ -296,32 +296,51 @@ class LinuxTaskController extends TaskCo
 
   @Override
   public void deleteAsUser(String user, String subDir) throws IOException {
-    String[] command = 
-      new String[]{taskControllerExe, 
-                   user,
-                   localStorage.getDirsString(),
-                   Integer.toString(Commands.DELETE_AS_USER.getValue()),
-                   subDir};
-    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("deleteAsUser: " + Arrays.toString(command));
+    String[] command = new String[] { taskControllerExe, user,
+        localStorage.getDirsString(),
+        Integer.toString(Commands.DELETE_AS_USER.getValue()), subDir };
+    ShellCommandExecutor shExec = null;
+    try {
+      shExec = new ShellCommandExecutor(command);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteAsUser: " + Arrays.toString(command));
+      }
+      shExec.execute();
+    } catch (IOException e) {
+      if (shExec != null) {
+        int exitCode = shExec.getExitCode();
+        LOG.info("deleteAsUser: " + Arrays.toString(command));
+        LOG.warn("Exit code is : " + exitCode);
+        LOG.info("Output from deleteAsUser LinuxTaskController:");
+        logOutput(shExec.getOutput());
+      }
+      throw e;
     }
-    shExec.execute();
   }
 
   @Override
   public void deleteLogAsUser(String user, String subDir) throws IOException {
-    String[] command = 
-      new String[]{taskControllerExe, 
-                   user,
-                   localStorage.getDirsString(),
-                   Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
-                   subDir};
-    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+    String[] command = new String[] { taskControllerExe, user,
+        localStorage.getDirsString(),
+        Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()), subDir };
+    ShellCommandExecutor shExec = null;
+    try {
+      shExec = new ShellCommandExecutor(command);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
+      }
+      shExec.execute();
+    } catch (IOException e) {
+      if (shExec != null) {
+        int exitCode = shExec.getExitCode();
+        LOG.info("deleteLogAsUser: " + Arrays.toString(command));
+        LOG.warn("Exit code is : " + exitCode);
+        LOG.info("Output from deleteLogAsUser LinuxTaskController:");
+        logOutput(shExec.getOutput());
+      }
+      throw e;
     }
-    shExec.execute();
   }
 
   @Override