You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2009/05/27 13:08:21 UTC

svn commit: r779112 - in /hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred: JobEndNotifier.java JobTracker.java TaskTracker.java TaskTrackerAction.java TaskTrackerStatus.java

Author: stevel
Date: Wed May 27 11:08:21 2009
New Revision: 779112

URL: http://svn.apache.org/viewvc?rev=779112&view=rev
Log:
HADOOP-3628 bring the JT and TT under the lifecycle

Modified:
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Wed May 27 11:08:21 2009
@@ -89,7 +89,11 @@
 
   public static void stopNotifier() {
     running = false;
-    thread.interrupt();
+    //copy into a variable to deal with race conditions
+    Thread notifier = thread;
+    if (notifier != null) {
+      notifier.interrupt();
+    }
   }
 
   private static JobEndStatusInfo createNotification(JobConf conf,

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed May 27 11:08:21 2009
@@ -82,6 +82,7 @@
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Service;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
@@ -90,7 +91,8 @@
  * tracking MR jobs in a network environment.
  *
  *******************************************************/
-public class JobTracker implements MRConstants, InterTrackerProtocol,
+public class JobTracker extends Service 
+    implements MRConstants, InterTrackerProtocol,
     JobSubmissionProtocol, TaskTrackerManager,
     RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
 
@@ -119,7 +121,11 @@
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
-
+  /**
+   * Time in milliseconds to sleep while trying to start the job tracker:
+   * {@value}
+   */
+  private static final int STARTUP_SLEEP_INTERVAL = 1000;
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -179,7 +185,7 @@
     while (true) {
       try {
         result = new JobTracker(conf);
-        result.taskScheduler.setTaskTrackerManager(result);
+        startService(result);
         break;
       } catch (VersionMismatch e) {
         throw e;
@@ -188,19 +194,24 @@
       } catch (UnknownHostException e) {
         throw e;
       } catch (IOException e) {
-        LOG.warn("Error starting tracker: " + 
-                 StringUtils.stringifyException(e));
+        LOG.warn("Error starting tracker: " +
+                e.getMessage(), e);
       }
-      Thread.sleep(1000);
+      Thread.sleep(STARTUP_SLEEP_INTERVAL);
     }
-    if (result != null) {
+    if (result != null && result.isRunning()) {
       JobEndNotifier.startNotifier();
     }
     return result;
   }
 
-  public void stopTracker() throws IOException {
-    JobEndNotifier.stopNotifier();
+  /**
+   * This stops the tracker, the JobEndNotifier and moves the service into the
+   * terminated state.
+   *
+   * @throws IOException for any trouble during closedown
+   */
+  public synchronized void stopTracker() throws IOException {
     close();
   }
     
@@ -1390,7 +1401,7 @@
     }
   }
 
-  private final JobTrackerInstrumentation myInstrumentation;
+  private JobTrackerInstrumentation myInstrumentation;
     
   /////////////////////////////////////////////////////////////////
   // The real JobTracker
@@ -1516,7 +1527,7 @@
                                    );
 
   // Used to provide an HTML view on Job, Task, and TaskTracker structures
-  final HttpServer infoServer;
+  HttpServer infoServer;
   int infoPort;
 
   Server interTrackerServer;
@@ -1538,9 +1549,14 @@
   private QueueManager queueManager;
 
   /**
-   * Start the JobTracker process, listen on the indicated port
+   * Create the JobTracker, based on the configuration
+   * @param conf configuration to use
+   * @throws IOException on problems initializing the tracker
    */
   JobTracker(JobConf conf) throws IOException, InterruptedException {
+    super(conf);
+    this.conf = conf;
+
     // find the owner of the process
     try {
       mrOwner = UnixUserGroupInformation.login(conf);
@@ -1568,10 +1584,6 @@
     AVERAGE_BLACKLIST_THRESHOLD = 
       conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f); 
 
-    // This is a directory of temporary submission files.  We delete it
-    // on startup, and can delete any files that we're done with
-    this.conf = conf;
-    JobConf jobConf = new JobConf(conf);
 
     initializeTaskMemoryRelatedConfig();
 
@@ -1587,7 +1599,23 @@
       = conf.getClass("mapred.jobtracker.taskScheduler",
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
+    taskScheduler.setTaskTrackerManager(this);
+  }
                                            
+  /**
+   * This contains the startup logic moved out of the constructor.
+   * It must never be called directly. Instead call {@link Service#start()} and
+   * let service decide whether to invoke this method once and once only.
+   *
+   * Although most of the intialization work has been performed, the
+   * JobTracker does not go live until {@link #offerService()} is called.
+   * accordingly, JobTracker does not enter the Live state here.
+   * @throws IOException for any startup problems
+   */
+  protected void innerStart() throws IOException {
+    // This is a directory of temporary submission files.  We delete it
+    // on startup, and can delete any files that we're done with
+    JobConf jobConf = new JobConf(conf);
     // Set ports, start RPC servers, setup security policy etc.
     InetSocketAddress addr = getAddress(conf);
     this.localMachine = addr.getHostName();
@@ -1640,6 +1668,9 @@
     trackerIdentifier = getDateFormat().format(new Date());
 
     // Initialize instrumentation
+    //this operation is synchronized to stop findbugs warning of inconsistent
+    //access
+    synchronized (this) {    
     JobTrackerInstrumentation tmp;
     Class<? extends JobTrackerInstrumentation> metricsInst =
       getInstrumentationClass(jobConf);
@@ -1654,6 +1685,7 @@
       tmp = new JobTrackerMetricsInst(this, jobConf);
     }
     myInstrumentation = tmp;
+    }
     
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
@@ -1673,6 +1705,9 @@
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
           fs = FileSystem.get(conf);
+          if(fs == null) {
+            throw new IllegalStateException("Unable to bind to the filesystem");
+          }
         }
         // clean up the system dir, which will only work if hdfs is out of 
         // safe mode
@@ -1714,9 +1749,15 @@
                 ((RemoteException)ie).getClassName())) {
           throw ie;
         }
-        LOG.info("problem cleaning system directory: " + systemDir, ie);
+        LOG.info("problem cleaning system directory: " + systemDir + ": " + ie,
+                ie);
+      }
+      try {
+        Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted during system directory cleanup ",
+                e);
       }
-      Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
     }
 
     // Prepare for recovery. This is done irrespective of the status of restart
@@ -1756,7 +1797,11 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+    //this operation is synchronized to stop findbugs warning of inconsistent
+    //access
+    synchronized(this) {
+      completedJobStatusStore = new CompletedJobStatusStore(conf, fs);
+    }
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -1848,9 +1893,16 @@
   }
 
   /**
-   * Run forever
+   * Run forever.
+   * Change the system state to indicate that we are live
+   * @throws InterruptedException interrupted operations
+   * @throws IOException IO Problems
    */
   public void offerService() throws InterruptedException, IOException {
+    if(!enterLiveState()) {
+      //catch re-entrancy by returning early
+      return;
+    };
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler
@@ -1870,25 +1922,70 @@
     this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
 
-    if (completedJobStatusStore.isActive()) {
-      completedJobsStoreThread = new Thread(completedJobStatusStore,
-                                            "completedjobsStore-housekeeper");
-      completedJobsStoreThread.start();
+    synchronized (this) {
+      //this is synchronized to stop findbugs warning
+      if (completedJobStatusStore.isActive()) {
+        completedJobsStoreThread = new Thread(completedJobStatusStore,
+                                              "completedjobsStore-housekeeper");
+        completedJobsStoreThread.start();
+      }
     }
 
+    LOG.info("Starting interTrackerServer");
     // start the inter-tracker server once the jt is ready
     this.interTrackerServer.start();
     
-    synchronized (this) {
-      state = State.RUNNING;
-    }
     LOG.info("Starting RUNNING");
     
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
 
-  void close() throws IOException {
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (infoServer == null || !infoServer.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + infoPort));
+    }
+    if (interTrackerServer == null) {
+      status.addThrowable(
+              new IOException("InterTrackerServer is not running"));
+    }
+  }
+
+  /**
+   * This service shuts down by stopping the
+   * {@link JobEndNotifier} and then closing down the job
+   * tracker
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    JobEndNotifier.stopNotifier();
+    closeJobTracker();
+  }
+
+  /**
+   * Close down all the Job tracker threads, and the
+   * task scheduler.
+   * This was package scoped, but has been made private so that
+   * it does not get used. Callers should call {@link #close()} to
+   * stop a JobTracker
+   * @throws IOException if problems occur
+   */
+  private void closeJobTracker() throws IOException {
     if (this.infoServer != null) {
       LOG.info("Stopping infoServer");
       try {
@@ -1901,48 +1998,63 @@
       LOG.info("Stopping interTrackerServer");
       this.interTrackerServer.stop();
     }
-    if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
-      LOG.info("Stopping expireTrackers");
-      this.expireTrackersThread.interrupt();
-      try {
-        this.expireTrackersThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-    if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
-      LOG.info("Stopping retirer");
-      this.retireJobsThread.interrupt();
-      try {
-        this.retireJobsThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
+    retireThread("expireTrackersThread", expireTrackersThread);
+    retireThread("retirer", retireJobsThread);
     if (taskScheduler != null) {
       taskScheduler.terminate();
     }
-    if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
-      LOG.info("Stopping expireLaunchingTasks");
-      this.expireLaunchingTaskThread.interrupt();
+    retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
+    retireThread("completedJobsStore thread", completedJobsStoreThread);
+    LOG.info("stopped all jobtracker services");
+  }
+
+  /**
+   * Close the filesystem without raising an exception. At the end of this
+   * method, fs==null.
+   * Warning: closing the FS may make it unusable for other clients in the same JVM.
+   */
+  protected synchronized void closeTheFilesystemQuietly() {
+    if (fs != null) {
       try {
-        this.expireLaunchingTaskThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        fs.close();
+      } catch (IOException e) {
+        LOG.warn("When closing the filesystem: " + e, e);
       }
+      fs = null;
     }
-    if (this.completedJobsStoreThread != null &&
-        this.completedJobsStoreThread.isAlive()) {
-      LOG.info("Stopping completedJobsStore thread");
-      this.completedJobsStoreThread.interrupt();
+  }
+
+  /**
+   * Retire a named thread if it is not null and still alive. The thread will be
+   * interruped and then joined.
+   *
+   * @param name   thread name for log messages
+   * @param thread thread -can be null.
+   * @return true if the thread was shut down; false implies this thread was
+   *         interrupted.
+   */
+  protected boolean retireThread(String name, Thread thread) {
+    if (thread != null && thread.isAlive()) {
+      LOG.info("Stopping " + name);
+      thread.interrupt();
       try {
-        this.completedJobsStoreThread.join();
+        thread.join();
       } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        LOG.info("interruped during " + name + " shutdown", ex);
+        return false;
       }
     }
-    LOG.info("stopped all jobtracker services");
-    return;
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "JobTracker";
   }
     
   ///////////////////////////////////////////////////////
@@ -2476,7 +2588,7 @@
     return numTaskCacheLevels;
   }
   public int getNumResolvedTaskTrackers() {
-    return numResolved;
+    return taskTrackers.size();
   }
   
   public int getNumberOfUniqueHosts() {
@@ -3012,6 +3124,7 @@
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
 
@@ -3024,6 +3137,7 @@
    * the JobTracker alone.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
@@ -3118,6 +3232,10 @@
 
   public synchronized ClusterStatus getClusterStatus(boolean detailed) {
     synchronized (taskTrackers) {
+      //backport the service state into the job tracker state
+      State state = getServiceState() == ServiceState.LIVE ?
+              State.RUNNING :
+              State.INITIALIZING;
       if (detailed) {
         List<List<String>> trackerNames = taskTrackerNames();
         return new ClusterStatus(trackerNames.get(0),
@@ -3445,6 +3563,10 @@
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
+    if (fs == null) {
+      throw new java.lang.IllegalStateException("Filesystem is null; "
+              + "JobTracker is not live: " + this);
+    }
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed May 27 11:08:21 2009
@@ -83,6 +83,8 @@
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.Service;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -94,11 +96,11 @@
  * for Task assignments and reporting results.
  *
  *******************************************************/
-public class TaskTracker 
+public class TaskTracker extends Service
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
   
   static final long WAIT_FOR_DONE = 3 * 1000;
-  private int httpPort;
+  int httpPort;
 
   static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
@@ -120,7 +122,10 @@
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  volatile boolean running = true;
+  /**
+   * Flag used to synchronize running state across threads.
+   */
+  private volatile boolean running = false;
 
   private LocalDirAllocator localDirAllocator;
   String taskTrackerName;
@@ -153,7 +158,7 @@
   // The filesystem where job files are stored
   FileSystem systemFS = null;
   
-  private final HttpServer server;
+  private HttpServer server;
     
   volatile boolean shuttingDown = false;
     
@@ -304,33 +309,7 @@
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private Thread taskCleanupThread = 
-    new Thread(new Runnable() {
-        public void run() {
-          while (true) {
-            try {
-              TaskTrackerAction action = tasksToCleanup.take();
-              if (action instanceof KillJobAction) {
-                purgeJob((KillJobAction) action);
-              } else if (action instanceof KillTaskAction) {
-                TaskInProgress tip;
-                KillTaskAction killAction = (KillTaskAction) action;
-                synchronized (TaskTracker.this) {
-                  tip = tasks.get(killAction.getTaskID());
-                }
-                LOG.info("Received KillTaskAction for task: " + 
-                         killAction.getTaskID());
-                purgeTask(tip, false);
-              } else {
-                LOG.error("Non-delete action given to cleanup thread: "
-                          + action);
-              }
-            } catch (Throwable except) {
-              LOG.warn(StringUtils.stringifyException(except));
-            }
-          }
-        }
-      }, "taskCleanup");
+  private TaskCleanupThread taskCleanupThread;
 
   TaskController getTaskController() {
     return taskController;
@@ -424,6 +403,17 @@
    * close().
    */
   synchronized void initialize() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initializing Task Tracker: " + this);
+    }
+    //check that the server is not already live.                        
+
+    //allow this operation in only two service states: started and live 
+    verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
+
+    //flip the running switch for our inner threads                     
+    running = true;
+
     localFs = FileSystem.getLocal(fConf);
     // use configured nameserver & interface to get local hostname
     if (fConf.get("slave.host.name") != null) {
@@ -507,10 +497,17 @@
     DistributedCache.purgeCache(this.fConf);
     cleanupStorage();
 
+    //mark as just started; this is used in heartbeats
+    this.justStarted = true;
+    int connectTimeout = fConf
+            .getInt("mapred.task.tracker.connect.timeout", 60000);
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
-                       jobTrackAddr, this.fConf);
+                       jobTrackAddr, this.fConf, connectTimeout);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connected to JobTracker at " + jobTrackAddr);
+    }
     this.justInited = true;
     this.running = true;    
     // start the thread that will fetch map task completion events
@@ -555,7 +552,9 @@
    * startup, to remove any leftovers from previous run.
    */
   public void cleanupStorage() throws IOException {
-    this.fConf.deleteLocalFiles();
+    if (fConf != null) {
+      fConf.deleteLocalFiles();
+    }
   }
 
   // Object on wait which MapEventsFetcherThread is going to wait.
@@ -835,25 +834,73 @@
     }
   }
     
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException       for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (server == null || !server.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + httpPort));
+    }
+    if (taskReportServer == null) {
+      status.addThrowable(
+              new IOException("TaskTracker Report Server is not running on "
+              + taskReportAddress));
+    }
+  }
+
+  /**
+   * A shutdown request triggers termination
+   * @throws IOException when errors happen during termination
+   */
   public synchronized void shutdown() throws IOException {
-    shuttingDown = true;
     close();
-    if (this.server != null) {
-      try {
-        LOG.info("Shutting down StatusHttpServer");
-        this.server.stop();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    synchronized (this) {
+      shuttingDown = true;
+      closeTaskTracker();
+      if (this.server != null) {
+        try {
+          LOG.info("Shutting down StatusHttpServer");
+          this.server.stop();
       } catch (Exception e) {
         LOG.warn("Exception shutting down TaskTracker", e);
+        }
       }
+      stopCleanupThreads();
     }
   }
+
   /**
    * Close down the TaskTracker and all its components.  We must also shutdown
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
+   * @throws IOException when errors happen during shutdown
    */
-  public synchronized void close() throws IOException {
+  public synchronized void closeTaskTracker() throws IOException {
+    if (!running) {
+      //this operation is a no-op when not already running
+      return;
+    }
+    running = false;
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -865,27 +912,37 @@
       tip.jobHasFinished(false);
     }
     
-    this.running = false;
-        
     // Clear local storage
     cleanupStorage();
         
     // Shutdown the fetcher thread
-    this.mapEventsFetcher.interrupt();
+    if (mapEventsFetcher != null) {
+      mapEventsFetcher.interrupt();
+    }
     
     //stop the launchers
-    this.mapLauncher.interrupt();
-    this.reduceLauncher.interrupt();
-    
-    jvmManager.stop();
+    if (mapLauncher != null) {
+      mapLauncher.cleanTaskQueue();
+      mapLauncher.interrupt();
+    }
+    if (reduceLauncher != null) {
+      reduceLauncher.cleanTaskQueue();
+      reduceLauncher.interrupt();
+    }
     
+    if (jvmManager != null) {
+      jvmManager.stop();
+    }
+      
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
 
     // wait for the fetcher thread to exit
     for (boolean done = false; !done; ) {
       try {
-        this.mapEventsFetcher.join();
+        if(mapEventsFetcher != null) {
+          mapEventsFetcher.join();
+        }
         done = true;
       } catch (InterruptedException e) {
       }
@@ -898,10 +955,45 @@
   }
 
   /**
-   * Start with the local machine name, and the default JobTracker
+   * Create and start a task tracker.                                         
+   * Subclasses must not subclass this constructor, as it may                 
+   * call their initialisation/startup methods before the construction
+   * is complete         
+   * It is here for backwards compatibility.                                  
+   * @param conf configuration                                                
+   * @throws IOException for problems on startup                              
    */
   public TaskTracker(JobConf conf) throws IOException {
+    this(conf, true);
+  }
+
+  /**
+   * Subclasses should extend this constructor and pass start=false to the    
+   * superclass to avoid race conditions in constructors and threads.         
+   * @param conf configuration                                                
+   * @param start flag to set to true to start the daemon                     
+   * @throws IOException for problems on startup                              
+   */
+  protected TaskTracker(JobConf conf, boolean start) throws IOException {
+    super(conf);
     fConf = conf;
+    //for backwards compatibility, the task tracker starts up unless told not 
+    //to. Subclasses should be very cautious about having their superclass    
+    //do that as subclassed methods can be invoked before the class is fully  
+    //configured                                                              
+    if (start) {
+      startService(this);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException for any problem.
+   */
+  @Override
+  protected synchronized void innerStart() throws IOException {
+    JobConf conf = fConf;
     maxCurrentMapTasks = conf.getInt(
                   "mapred.tasktracker.map.tasks.maximum", 2);
     maxCurrentReduceTasks = conf.getInt(
@@ -944,11 +1036,22 @@
   }
   
   private void startCleanupThreads() throws IOException {
+    taskCleanupThread = new TaskCleanupThread();
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
   }
-  
+
+  /**
+   * Tell the cleanup threads that they should end themselves   
+   */
+  private void stopCleanupThreads() {
+    if (taskCleanupThread != null) {
+      taskCleanupThread.terminate();
+      taskCleanupThread = null;
+    }
+  }
+
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
@@ -996,6 +1099,7 @@
    */
   State offerService() throws Exception {
     long lastHeartbeat = 0;
+    boolean restartingService = true;
 
     while (running && !shuttingDown) {
       try {
@@ -1011,6 +1115,7 @@
         // 1. Verify the buildVersion
         // 2. Get the system directory & filesystem
         if(justInited) {
+          LOG.debug("Checking build version with JobTracker");
           String jobTrackerBV = jobClient.getBuildVersion();
           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
             String msg = "Shutting down. Incompatible buildVersion." +
@@ -1020,7 +1125,7 @@
             try {
               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
             } catch(Exception e ) {
-              LOG.info("Problem reporting to jobtracker: " + e);
+              LOG.info("Problem reporting to jobtracker: " + e, e);
             }
             return State.DENIED;
           }
@@ -1031,6 +1136,9 @@
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("System directory is " + systemDirectory);
+          }
         }
         
         // Send the heartbeat and process the jobtracker's directives
@@ -1083,6 +1191,15 @@
           return State.STALE;
         }
             
+        //At this point the job tracker is present and compatible,
+        //so the service is coming up.
+        //It is time to declare it as such
+        if (restartingService) {
+          //declare the service as live.
+          enterLiveState();
+          restartingService = false;
+        }
+            
         // resetting heartbeat interval from the response.
         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
         justStarted = false;
@@ -1726,6 +1843,8 @@
               if (!shuttingDown) {
                 LOG.info("Lost connection to JobTracker [" +
                          jobTrackAddr + "].  Retrying...", ex);
+                //enter the started state; we are no longer live
+                enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
                 try {
                   Thread.sleep(5000);
                 } catch (InterruptedException ie) {
@@ -1734,7 +1853,7 @@
             }
           }
         } finally {
-          close();
+          closeTaskTracker();
         }
         if (shuttingDown) { return; }
         LOG.warn("Reinitializing local state");
@@ -2704,7 +2823,17 @@
   String getName() {
     return taskTrackerName;
   }
-    
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return taskTrackerName != null ? taskTrackerName : "Task Tracker";
+  }
+
   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
                                           boolean sendCounters) {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -2821,7 +2950,9 @@
       // enable the server to track time spent waiting on locks
       ReflectionUtils.setContentionTracing
         (conf.getBoolean("tasktracker.contention.tracking", false));
-      new TaskTracker(conf).run();
+      TaskTracker tracker = new TaskTracker(conf, false);
+      Service.startService(tracker);
+      tracker.run();
     } catch (Throwable e) {
       LOG.error("Can not start task tracker because "+
                 StringUtils.stringifyException(e));
@@ -3102,8 +3233,70 @@
       try {
         purgeTask(tip, wasFailure); // Marking it as failed/killed.
       } catch (IOException ioe) {
-        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
+      }
+    }
+  }
+
+  /**
+   * Cleanup queue that can process actions to kill a job or task
+   */
+  private class TaskCleanupThread extends Daemon {
+
+    /**
+     * flag to halt work
+     */
+    private volatile boolean live = true;
+
+
+    /**
+     * Construct a daemon thread.
+     */
+    private TaskCleanupThread() {
+      setName("Task Tracker Task Cleanup Thread");
+    }
+
+    /**
+     * End the daemon. This is done by setting the live flag to false and
+     * interrupting ourselves.
+     */
+    public void terminate() {
+      live = false;
+      interrupt();
+    }
+
+    /**
+     * process task kill actions until told to stop being live.
+     */
+    public void run() {
+      LOG.debug("Task cleanup thread started");
+      while (live) {
+        try {
+          TaskTrackerAction action = tasksToCleanup.take();
+          if (action instanceof KillJobAction) {
+            purgeJob((KillJobAction) action);
+          } else if (action instanceof KillTaskAction) {
+            TaskInProgress tip;
+            KillTaskAction killAction = (KillTaskAction) action;
+            synchronized (TaskTracker.this) {
+              tip = tasks.get(killAction.getTaskID());
+            }
+            LOG.info("Received KillTaskAction for task: " +
+                    killAction.getTaskID());
+            purgeTask(tip, false);
+          } else {
+            LOG.error("Non-delete action given to cleanup thread: "
+                    + action);
+          }
+        } catch (InterruptedException except) {
+          //interrupted. this may have reset the live flag
+        } catch (Throwable except) {
+          LOG.warn("Exception in Cleanup thread: " + except,
+                  except);
+        }
       }
+      LOG.debug("Task cleanup thread ending");
     }
   }
+
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Wed May 27 11:08:21 2009
@@ -107,6 +107,15 @@
     return actionType;
   }
 
+  /**
+   * {@inheritDoc}
+   * @return the action type.
+   */
+  @Override
+  public String toString() {
+    return "TaskTrackerAction: " + actionType;
+  }
+
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeEnum(out, actionType);
   }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=779112&r1=779111&r2=779112&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed May 27 11:08:21 2009
@@ -309,6 +309,18 @@
   }  
   
   /**
+   * String value prints the basic status of the task tracker
+   * @return a string value for diagnostics
+   */
+  @Override
+  public String toString() {
+    return trackerName
+            + " at http://" + host + ":" + httpPort + "/"
+            + " current task count: " + taskReports.size()
+            + " failed task count: " + failures;
+  }
+  
+  /**
    * Return the {@link ResourceStatus} object configured with this
    * status.
    *