You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2009/03/12 18:50:05 UTC

svn commit: r752949 [2/3] - in /hadoop/core: branches/HADOOP-3628/src/test/org/apache/hadoop/cli/ branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/ trunk/conf/ trunk/ivy/ trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ trunk/src/c...

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Mar 12 17:50:03 2009
@@ -395,9 +395,7 @@
    */
   synchronized void processIOError(int index) {
     if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-      "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
+      processAllStorageInaccessible();
     }
     assert(index < getNumStorageDirs());
     assert(getNumStorageDirs() == editStreams.size());
@@ -417,27 +415,43 @@
     //
     fsimage.processIOError(parentStorageDir);
   }
-  
+
+  /**
+   * report inaccessible storage directories and trigger a fatal error
+   */
+  private void processAllStorageInaccessible() {
+    processFatalError("Fatal Error: All storage directories are inaccessible.");
+  }
+
+  /**
+   * Handle a fatal error
+   * @param message message to include in any output
+   */
+  protected void processFatalError(String message) {
+    FSNamesystem.LOG.fatal(message);
+    Runtime.getRuntime().exit(-1);
+  }
+
   /**
    * If there is an IO Error on any log operations on storage directory,
    * remove any stream associated with that directory 
    */
   synchronized void processIOError(StorageDirectory sd) {
     // Try to remove stream only if one should exist
-    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
       return;
+    }
     if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-          "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
+      processAllStorageInaccessible();
     }
     for (int idx = 0; idx < editStreams.size(); idx++) {
-      File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                       .get(idx)).getFile()
-                                       .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+      File parentStorageDir = ((EditLogFileOutputStream) editStreams
+              .get(idx)).getFile()
+              .getParentFile().getParentFile();
+      if (parentStorageDir.getName().equals(sd.getRoot().getName())) {
         editStreams.remove(idx);
- }
+      }
+    }
   }
   
   /**
@@ -458,10 +472,8 @@
         }
       }
       if (j == numEditStreams) {
-          FSNamesystem.LOG.error("Unable to find sync log on which " +
-                                 " IO error occured. " +
-                                 "Fatal Error.");
-          Runtime.getRuntime().exit(-1);
+        processFatalError("Fatal Error: Unable to find sync log on which " +
+                                 " IO error occured. ");
       }
       processIOError(j);
     }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Mar 12 17:50:03 2009
@@ -449,6 +449,59 @@
   }
 
   /**
+   * Test for a thread ref not being null or pointing to a dead thread
+   * @param thread the thread to check
+   * @return true if the thread is considered dead
+   */
+  private boolean isDead(Thread thread) {
+      return thread == null || !thread.isAlive();
+  }
+
+  /**
+   * Perform a cursory health check of the namesystem, particulary that it has
+   * not been closed and that all threads are running.
+   * @throws IOException for any health check
+   */
+  void ping() throws IOException {
+    if (!fsRunning) {
+      throw new IOException("Namesystem is not running");
+    }
+    boolean bad = false;
+    StringBuilder sb = new StringBuilder();
+    if (isDead(hbthread)) {
+      bad = true;
+      sb.append("[Heartbeat thread is dead]");
+    }
+    if (isDead(replthread)) {
+      bad = true;
+      sb.append("[Replication thread is dead]");
+    }
+    // this thread's liveness is only relevant in safe mode.
+    if (safeMode!=null && isDead(smmthread)) {
+      bad = true;
+      sb.append("[SafeModeMonitor thread is dead while the name system is in safe mode]");
+    }
+    if (isDead(dnthread)) {
+        bad = true;
+        sb.append("[DecommissionedMonitor thread is dead]");
+    }
+    if (isDead(lmthread)) {
+      bad = true;
+      sb.append("[Lease monitor thread is dead]");
+    }
+    if (pendingReplications == null || !pendingReplications.isAlive()) {
+      bad = true;
+      sb.append("[Pending replication thread is dead]");
+    }
+    if (this != getFSNamesystem()) {
+      bad = true;
+      sb.append("[FSNamesystem not a singleton]");
+    }
+    if (bad) {
+      throw new IOException(sb.toString());
+    }
+  }
+  /**
    * Close down this file system manager.
    * Causes heartbeat and lease daemons to stop; waits briefly for
    * them to finish, but a short timeout returns control back to caller.
@@ -470,7 +523,10 @@
           lmthread.interrupt();
           lmthread.join(3000);
         }
-        dir.close();
+        if(dir != null) {
+         dir.close();
+         dir =  null;
+        }
       } catch (InterruptedException ie) {
       } catch (IOException ie) {
         LOG.error("Error closing FSDirectory", ie);
@@ -1252,9 +1308,13 @@
                                                            null,
                                                            blockSize);
     if (targets.length < this.minReplication) {
-      throw new IOException("File " + src + " could only be replicated to " +
-                            targets.length + " nodes, instead of " +
-                            minReplication);
+        String message = "File " + src + " could only be replicated to " +
+                targets.length + " nodes, instead of "
+                + minReplication
+                + ". ( there are " + heartbeats.size()
+                + " live data nodes in the cluster)";
+
+        throw new IOException(message);
     }
 
     // Allocate a new block and record it in the INode. 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Mar 12 17:50:03 2009
@@ -45,6 +45,7 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Service;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
@@ -84,7 +85,7 @@
  * NameNode implements the ClientProtocol interface, which allows
  * clients to ask for DFS services.  ClientProtocol is not
  * designed for direct use by authors of DFS client code.  End-users
- * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
+ * should instead use the {@link FileSystem} class.
  *
  * NameNode also implements the DatanodeProtocol interface, used by
  * DataNode programs that actually store DFS data blocks.  These
@@ -95,7 +96,7 @@
  * secondary namenodes or rebalancing processes to get partial namenode's
  * state, for example partial blocksMap etc.
  **********************************************************/
-public class NameNode implements ClientProtocol, DatanodeProtocol,
+public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
                                  NamenodeProtocol, FSConstants,
                                  RefreshAuthorizationPolicyProtocol {
   static{
@@ -133,8 +134,6 @@
   /** HTTP server address */
   private InetSocketAddress httpAddress = null;
   private Thread emptier;
-  /** only used for testing purposes  */
-  private boolean stopRequested = false;
   /** Is service level authorization enabled? */
   private boolean serviceAuthEnabled = false;
   
@@ -162,7 +161,17 @@
   }
 
   public static InetSocketAddress getAddress(Configuration conf) {
-    return getAddress(FileSystem.getDefaultUri(conf).getAuthority());
+    URI fsURI = FileSystem.getDefaultUri(conf);
+    if (fsURI == null) {
+      throw new IllegalArgumentException(
+              "No default filesystem URI in the configuration");
+    }
+    String auth = fsURI.getAuthority();
+    if (auth == null) {
+      throw new IllegalArgumentException(
+              "No authority for the Filesystem URI " + fsURI);
+    }
+    return getAddress(auth);
   }
 
   public static URI getUri(InetSocketAddress namenode) {
@@ -175,6 +184,7 @@
    * Initialize name-node.
    * 
    * @param conf the configuration
+   * @throws IOException for problems during initialization
    */
   private void initialize(Configuration conf) throws IOException {
     InetSocketAddress socAddr = NameNode.getAddress(conf);
@@ -261,7 +271,7 @@
   }
 
   /**
-   * Start NameNode.
+   * Create a NameNode.
    * <p>
    * The name-node can be started with one of the following startup options:
    * <ul> 
@@ -280,14 +290,49 @@
    * <code>zero</code> in the conf.
    * 
    * @param conf  confirguration
-   * @throws IOException
+   * @throws IOException for backwards compatibility
    */
   public NameNode(Configuration conf) throws IOException {
-    try {
-      initialize(conf);
-    } catch (IOException e) {
-      this.stop();
-      throw e;
+    super(conf);
+  }
+
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * This method does all the startup.
+   * It is invoked from {@link #start()} when needed.
+   *
+   * @throws IOException for any problem.
+   */
+  @Override
+  protected void innerStart() throws IOException {
+    initialize(getConf());
+    setServiceState(ServiceState.LIVE);
+  }
+
+    /**
+   * {@inheritDoc}.
+   *
+   * This implementation checks for the name system being non-null and live
+   * @throws IOException for any ping failure
+   * @throws LivenessException if the name system is not running @param status
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (namesystem == null) {
+      status.addThrowable(new LivenessException("No name system"));
+    } else {
+      try {
+        namesystem.ping();
+      } catch (IOException e) {
+        status.addThrowable(e);
+      }
+    }
+    if (httpServer == null || !httpServer.isAlive()) {
+      status.addThrowable(
+              new IOException("NameNode HttpServer is not running"));
     }
   }
 
@@ -297,34 +342,81 @@
    */
   public void join() {
     try {
-      this.server.join();
+      if (server != null) {
+        server.join();
+      }
     } catch (InterruptedException ie) {
     }
   }
 
   /**
-   * Stop all NameNode threads and wait for all to finish.
+   * {@inheritDoc}
+   * To shut down, this service stops all NameNode threads and waits for them
+   * to finish. It also stops the metrics.
    */
-  public void stop() {
-    if (stopRequested)
-      return;
-    stopRequested = true;
+  @Override
+  public synchronized void innerClose() throws IOException {
+    LOG.info("Closing NameNode");
     try {
-      if (httpServer != null) httpServer.stop();
+      if (httpServer != null) {
+        httpServer.stop();
+      }
     } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error(StringUtils.stringifyException(e),e);
+    }
+    httpServer = null;
+    if (namesystem != null) {
+      namesystem.close();
+    }
+    if (emptier != null) {
+      emptier.interrupt();
+      emptier = null;
+    }
+    if (server != null) {
+      server.stop();
+      server = null;
     }
-    if(namesystem != null) namesystem.close();
-    if(emptier != null) emptier.interrupt();
-    if(server != null) server.stop();
     if (myMetrics != null) {
       myMetrics.shutdown();
     }
     if (namesystem != null) {
       namesystem.shutdown();
+      namesystem = null;
     }
   }
   
+  /**
+   * Retained for backwards compatibility.
+   */
+  public void stop() {
+    closeQuietly();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "NameNode";
+  }
+
+  /**
+   * The toString operator returns the super class name/id, and the state. This
+   * gives all services a slightly useful message in a debugger or test report
+   *
+   * @return a string representation of the object.
+   */
+  @Override
+  public String toString() {
+    return getServiceName() + " instance " + super.toString() + " in state "
+            + getServiceState()
+            + (httpAddress != null ? (" at " + httpAddress + " , "): "")
+            + (server != null ? (", IPC " + server.getListenerAddress()) : "");
+  }
+
+
   /////////////////////////////////////////////////////
   // NamenodeProtocol
   /////////////////////////////////////////////////////
@@ -969,6 +1061,7 @@
     }
 
     NameNode namenode = new NameNode(conf);
+    deploy(namenode);
     return namenode;
   }
     

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java Thu Mar 12 17:50:03 2009
@@ -136,6 +136,15 @@
   }
 
   /**
+   * Test for the replicator being alive.
+   * @return true if the thread is running.
+   */
+  boolean isAlive() {
+    Daemon daemon = timerThread;
+    return daemon != null && daemon.isAlive();
+  }
+
+  /**
    * An object that contains information about a block that 
    * is being replicated. It records the timestamp when the 
    * system started replicating the most recent copy of this

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Mar 12 17:50:03 2009
@@ -1243,11 +1243,26 @@
   /** 
    * Utility that submits a job, then polls for progress until the job is
    * complete.
-   * 
+   *
    * @param job the job configuration.
-   * @throws IOException
+   * @return the job reference of the completed, successful, job
+   * @throws IOException any IO problem, and job failure
    */
   public static RunningJob runJob(JobConf job) throws IOException {
+    return runJob(job, -1);
+  }
+
+  /** 
+   * Utility that submits a job, then polls for progress until the job is
+   * complete.
+   * 
+   * @param job the job configuration.
+   * @param timeout timeout in milliseconds; any value less than or equal to
+   * zero means "do not time out"
+   * @return the job reference of the completed, successful, job
+   * @throws IOException any IO problem, and job failure
+   */
+  public static RunningJob runJob(JobConf job, long timeout) throws IOException {
     JobClient jc = new JobClient(job);
     boolean error = true;
     RunningJob running = null;
@@ -1255,6 +1270,7 @@
     final int MAX_RETRIES = 5;
     int retries = MAX_RETRIES;
     TaskStatusFilter filter;
+    long endTime = timeout > 0 ? System.currentTimeMillis() + timeout : 0;
     try {
       filter = getTaskOutputFilter(job);
     } catch(IllegalArgumentException e) {
@@ -1351,6 +1367,10 @@
           LOG.info("Communication problem with server: " +
                    StringUtils.stringifyException(ie));
         }
+        //check for timeout
+        if (endTime > 0 && endTime > System.currentTimeMillis()) {
+          throw new IOException("Job execution timed out");
+        }
       }
       if (!running.isSuccessful()) {
         throw new IOException("Job failed!");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Thu Mar 12 17:50:03 2009
@@ -89,7 +89,11 @@
 
   public static void stopNotifier() {
     running = false;
-    thread.interrupt();
+    //copy into a variable to deal with race conditions
+    Thread notifier = thread;
+    if (notifier != null) {
+      notifier.interrupt();
+    }
   }
 
   private static JobEndStatusInfo createNotification(JobConf conf,

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Mar 12 17:50:03 2009
@@ -77,13 +77,14 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.Service;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * tracking MR jobs in a network environment.
  *
  *******************************************************/
-public class JobTracker implements MRConstants, InterTrackerProtocol,
+public class JobTracker extends Service implements MRConstants, InterTrackerProtocol,
     JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
 
   static{
@@ -110,7 +111,11 @@
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
-
+  /**
+   * Time in milliseconds to sleep while trying to start the job tracker:
+   * {@value}
+   */
+  private static final int STARTUP_SLEEP_INTERVAL = 1000;
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -166,7 +171,7 @@
     while (true) {
       try {
         result = new JobTracker(conf);
-        result.taskScheduler.setTaskTrackerManager(result);
+        deploy(result);
         break;
       } catch (VersionMismatch e) {
         throw e;
@@ -175,19 +180,24 @@
       } catch (UnknownHostException e) {
         throw e;
       } catch (IOException e) {
-        LOG.warn("Error starting tracker: " + 
-                 StringUtils.stringifyException(e));
+        LOG.warn("Error starting tracker: " +
+                e.getMessage(), e);
       }
-      Thread.sleep(1000);
+      Thread.sleep(STARTUP_SLEEP_INTERVAL);
     }
-    if (result != null) {
+    if (result != null && result.isRunning()) {
       JobEndNotifier.startNotifier();
     }
     return result;
   }
 
-  public void stopTracker() throws IOException {
-    JobEndNotifier.stopNotifier();
+  /**
+   * This stops the tracker, the JobEndNotifier and moves the service into the
+   * terminated state.
+   *
+   * @throws IOException for any trouble during closedown
+   */
+  public synchronized void stopTracker() throws IOException {
     close();
   }
     
@@ -1299,7 +1309,7 @@
   // (hostname --> Node (NetworkTopology))
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
-  
+
   // Number of resolved entries
   int numResolved;
     
@@ -1351,7 +1361,7 @@
                                    );
 
   // Used to provide an HTML view on Job, Task, and TaskTracker structures
-  final HttpServer infoServer;
+  HttpServer infoServer;
   int infoPort;
 
   Server interTrackerServer;
@@ -1366,9 +1376,13 @@
   private QueueManager queueManager;
 
   /**
-   * Start the JobTracker process, listen on the indicated port
+   * Create the JobTracker, based on the configuration
+   * @param conf configuration to use
+   * @throws IOException on problems initializing the tracker
    */
   JobTracker(JobConf conf) throws IOException, InterruptedException {
+    super(conf);
+    this.conf = conf;
     //
     // Grab some static constants
     //
@@ -1386,10 +1400,6 @@
     AVERAGE_BLACKLIST_THRESHOLD = 
       conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f); 
 
-    // This is a directory of temporary submission files.  We delete it
-    // on startup, and can delete any files that we're done with
-    this.conf = conf;
-    JobConf jobConf = new JobConf(conf);
 
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
@@ -1402,7 +1412,23 @@
       = conf.getClass("mapred.jobtracker.taskScheduler",
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
+    taskScheduler.setTaskTrackerManager(this);
+  }
                                            
+  /**
+   * This contains the startup logic moved out of the constructor.
+   * It must never be called directly. Instead call {@link Service#start()} and
+   * let service decide whether to invoke this method once and once only.
+   *
+   * Although most of the intialization work has been performed, the
+   * JobTracker does not go live until {@link #offerService()} is called.
+   * accordingly, JobTracker does not enter the Live state here.
+   * @throws IOException for any startup problems
+   */
+  protected void innerStart() throws IOException {
+    // This is a directory of temporary submission files.  We delete it
+    // on startup, and can delete any files that we're done with
+    JobConf jobConf = new JobConf(conf);
     // Set ports, start RPC servers, setup security policy etc.
     InetSocketAddress addr = getAddress(conf);
     this.localMachine = addr.getHostName();
@@ -1458,15 +1484,19 @@
     trackerIdentifier = getDateFormat().format(new Date());
 
     Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
-    try {
-      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
-        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
-      this.myInstrumentation = c.newInstance(this, jobConf);
-    } catch(Exception e) {
-      //Reflection can throw lots of exceptions -- handle them all by 
-      //falling back on the default.
-      LOG.error("failed to initialize job tracker metrics", e);
-      this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+    //this operation is synchronized to stop findbugs warning of inconsistent
+    //access
+    synchronized (this) {
+      try {
+        java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+          metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+        this.myInstrumentation = c.newInstance(this, jobConf);
+      } catch(Exception e) {
+        //Reflection can throw lots of exceptions -- handle them all by 
+        //falling back on the default.
+        LOG.error("failed to initialize job tracker metrics", e);
+        this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+      }
     }
  
     
@@ -1488,6 +1518,9 @@
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
           fs = FileSystem.get(conf);
+          if(fs == null) {
+            throw new IllegalStateException("Unable to bind to the filesystem");
+          }
         }
         // clean up the system dir, which will only work if hdfs is out of 
         // safe mode
@@ -1529,9 +1562,14 @@
                 ((RemoteException)ie).getClassName())) {
           throw ie;
         }
-        LOG.info("problem cleaning system directory: " + systemDir, ie);
+        LOG.info("problem cleaning system directory: " + systemDir + ": " + ie, ie);
+      }
+      try {
+        Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted during system directory cleanup ",
+                e);
       }
-      Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
     }
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
@@ -1553,7 +1591,11 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+    //this operation is synchronized to stop findbugs warning of inconsistent
+    //access
+    synchronized(this) {
+      completedJobStatusStore = new CompletedJobStatusStore(conf, fs);
+    }
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -1618,9 +1660,16 @@
   }
 
   /**
-   * Run forever
+   * Run forever.
+   * Change the system state to indicate that we are live
+   * @throws InterruptedException interrupted operations
+   * @throws IOException IO Problems
    */
   public void offerService() throws InterruptedException, IOException {
+    if(!enterLiveState()) {
+      //catch re-entrancy by returning early
+      return;
+    };
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler
@@ -1637,79 +1686,139 @@
     this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
 
-    if (completedJobStatusStore.isActive()) {
-      completedJobsStoreThread = new Thread(completedJobStatusStore,
-                                            "completedjobsStore-housekeeper");
-      completedJobsStoreThread.start();
+    synchronized (this) {
+      //this is synchronized to stop findbugs warning
+      if (completedJobStatusStore.isActive()) {
+        completedJobsStoreThread = new Thread(completedJobStatusStore,
+                                              "completedjobsStore-housekeeper");
+        completedJobsStoreThread.start();
+      }
     }
 
+    LOG.info("Starting interTrackerServer");
     // start the inter-tracker server once the jt is ready
     this.interTrackerServer.start();
     
-    synchronized (this) {
-      state = State.RUNNING;
-    }
-    LOG.info("Starting RUNNING");
     
+    LOG.info("Starting RUNNING");
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
 
-  void close() throws IOException {
-    if (this.infoServer != null) {
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (infoServer == null || !infoServer.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + infoPort));
+    }
+    if (interTrackerServer == null) {
+      status.addThrowable(
+              new IOException("InterTrackerServer is not running"));
+    }
+  }
+
+  /**
+   * This service shuts down by stopping the
+   * {@link JobEndNotifier} and then closing down the job
+   * tracker
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    JobEndNotifier.stopNotifier();
+    closeJobTracker();
+  }
+
+  /**
+   * Close down all the Job tracker threads, and the
+   * task scheduler.
+   * This was package scoped, but has been made private so that
+   * it does not get used. Callers should call {@link #close()} to
+   * stop a JobTracker
+   * @throws IOException if problems occur
+   */
+  private void closeJobTracker() throws IOException {
+    if (infoServer != null) {
       LOG.info("Stopping infoServer");
       try {
-        this.infoServer.stop();
+        infoServer.stop();
       } catch (Exception ex) {
         LOG.warn("Exception shutting down JobTracker", ex);
       }
     }
-    if (this.interTrackerServer != null) {
+    if (interTrackerServer != null) {
       LOG.info("Stopping interTrackerServer");
-      this.interTrackerServer.stop();
-    }
-    if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
-      LOG.info("Stopping expireTrackers");
-      this.expireTrackersThread.interrupt();
-      try {
-        this.expireTrackersThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-    if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
-      LOG.info("Stopping retirer");
-      this.retireJobsThread.interrupt();
-      try {
-        this.retireJobsThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
+      interTrackerServer.stop();
     }
+    retireThread("expireTrackersThread", expireTrackersThread);
+    retireThread("retirer", retireJobsThread);
     if (taskScheduler != null) {
       taskScheduler.terminate();
     }
-    if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
-      LOG.info("Stopping expireLaunchingTasks");
-      this.expireLaunchingTaskThread.interrupt();
+    retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
+    retireThread("completedJobsStore thread", completedJobsStoreThread);
+    LOG.info("stopped all jobtracker services");
+  }
+
+  /**
+   * Close the filesystem without raising an exception. At the end of this
+   * method, fs==null.
+   * Warning: closing the FS may make it unusable for other clients in the same JVM.
+   */
+  protected synchronized void closeTheFilesystemQuietly() {
+    if (fs != null) {
       try {
-        this.expireLaunchingTaskThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        fs.close();
+      } catch (IOException e) {
+        LOG.warn("When closing the filesystem: " + e, e);
       }
+      fs = null;
     }
-    if (this.completedJobsStoreThread != null &&
-        this.completedJobsStoreThread.isAlive()) {
-      LOG.info("Stopping completedJobsStore thread");
-      this.completedJobsStoreThread.interrupt();
+  }
+
+  /**
+   * Retire a named thread if it is not null and still alive. The thread will be
+   * interruped and then joined.
+   *
+   * @param name   thread name for log messages
+   * @param thread thread -can be null.
+   * @return true if the thread was shut down; false implies this thread was
+   *         interrupted.
+   */
+  protected boolean retireThread(String name, Thread thread) {
+    if (thread != null && thread.isAlive()) {
+      LOG.info("Stopping " + name);
+      thread.interrupt();
       try {
-        this.completedJobsStoreThread.join();
+        thread.join();
       } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        LOG.info("interruped during " + name + " shutdown", ex);
+        return false;
       }
     }
-    LOG.info("stopped all jobtracker services");
-    return;
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "JobTracker";
   }
     
   ///////////////////////////////////////////////////////
@@ -1847,7 +1956,7 @@
   }
     
   /**
-   * Call {@link #removeTaskEntry(String)} for each of the
+   * Call {@link #removeTaskEntry(TaskAttemptID)} for each of the
    * job's tasks.
    * When the JobTracker is retiring the long-completed
    * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
@@ -2230,7 +2339,7 @@
     return numTaskCacheLevels;
   }
   public int getNumResolvedTaskTrackers() {
-    return numResolved;
+    return taskTrackers.size();
   }
   
   public int getNumberOfUniqueHosts() {
@@ -2757,6 +2866,7 @@
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
 
@@ -2769,6 +2879,7 @@
    * the JobTracker alone.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
@@ -2851,6 +2962,10 @@
 
   public synchronized ClusterStatus getClusterStatus(boolean detailed) {
     synchronized (taskTrackers) {
+      //backport the service state into the job tracker state
+      State state = getServiceState() == ServiceState.LIVE ?
+              State.RUNNING :
+              State.INITIALIZING;
       if (detailed) {
         List<List<String>> trackerNames = taskTrackerNames();
         return new ClusterStatus(trackerNames.get(0),
@@ -3167,6 +3282,10 @@
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
+    if (fs == null) {
+      throw new java.lang.IllegalStateException("Filesystem is null; "
+              + "JobTracker is not live: " + this);
+    }
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 12 17:50:03 2009
@@ -81,11 +81,12 @@
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.Service;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -94,13 +95,19 @@
  * in a networked environment.  It contacts the JobTracker
  * for Task assignments and reporting results.
  *
+ *
+ * The TaskTracker has a complex lifecycle in that it
+ * can be "recycled"; after {@link #closeTaskTracker()} is called,
+ * it can be reset using {@link #initialize()}. This is
+ * within the {@link Service} lifecycle.
  *******************************************************/
-public class TaskTracker 
+public class TaskTracker extends Service
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
-  static final long WAIT_FOR_DONE = 3 * 1000;
-  private int httpPort;
+  /** time to wait for a finished task to be reported as done: {@value}*/
+  private static final long WAIT_FOR_DONE = 3 * 1000;
+  int httpPort;
 
-  static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+  enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
   static{
     Configuration.addDefaultResource("mapred-default.xml");
@@ -119,7 +126,10 @@
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  volatile boolean running = true;
+  /**
+   * Flag used to synchronize running state across threads.
+   */
+  private volatile boolean running = false;
 
   private LocalDirAllocator localDirAllocator;
   String taskTrackerName;
@@ -134,7 +144,7 @@
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
 
-  /*
+  /**
    * This is the last 'status' report sent by this tracker to the JobTracker.
    * 
    * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
@@ -144,14 +154,15 @@
    */
   TaskTrackerStatus status = null;
   
-  // The system-directory on HDFS where job files are stored 
+  /** The system-directory on HDFS where job files are stored */
   Path systemDirectory = null;
   
-  // The filesystem where job files are stored
+  /** The filesystem where job files are stored */
   FileSystem systemFS = null;
   
-  private final HttpServer server;
+  private HttpServer server;
     
+  /** Flag used to synchronize startup across threads. */
   volatile boolean shuttingDown = false;
     
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -344,33 +355,7 @@
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private Thread taskCleanupThread = 
-    new Thread(new Runnable() {
-        public void run() {
-          while (true) {
-            try {
-              TaskTrackerAction action = tasksToCleanup.take();
-              if (action instanceof KillJobAction) {
-                purgeJob((KillJobAction) action);
-              } else if (action instanceof KillTaskAction) {
-                TaskInProgress tip;
-                KillTaskAction killAction = (KillTaskAction) action;
-                synchronized (TaskTracker.this) {
-                  tip = tasks.get(killAction.getTaskID());
-                }
-                LOG.info("Received KillTaskAction for task: " + 
-                         killAction.getTaskID());
-                purgeTask(tip, false);
-              } else {
-                LOG.error("Non-delete action given to cleanup thread: "
-                          + action);
-              }
-            } catch (Throwable except) {
-              LOG.warn(StringUtils.stringifyException(except));
-            }
-          }
-        }
-      }, "taskCleanup");
+  private TaskCleanupThread taskCleanupThread;
     
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
@@ -481,11 +466,24 @@
   }
     
   /**
-   * Do the real constructor work here.  It's in a separate method
+   * Initialize the connection.
+   * This method will block until a job tracker is found
+   * It's in a separate method
    * so we can call it again and "recycle" the object after calling
    * close().
    */
   synchronized void initialize() throws IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Initializing Task Tracker: " + this);
+    }
+    //check that the server is not already live.
+
+    //allow this operation in only two service states: started and live
+    verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
+
+    //flip the running switch for our inner threads
+    running = true;
+    
     // use configured nameserver & interface to get local hostname
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");
@@ -572,10 +570,17 @@
     DistributedCache.purgeCache(this.fConf);
     cleanupStorage();
 
+    //mark as just started; this is used in heartbeats
+    this.justStarted = true;
+    int connectTimeout = fConf
+            .getInt("mapred.task.tracker.connect.timeout", 60000);
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
-                       jobTrackAddr, this.fConf);
+                       jobTrackAddr, fConf, connectTimeout);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connected to JobTracker at " + jobTrackAddr);
+    }
     this.justInited = true;
     this.running = true;    
     // start the thread that will fetch map task completion events
@@ -611,7 +616,9 @@
    * startup, to remove any leftovers from previous run.
    */
   public void cleanupStorage() throws IOException {
-    this.fConf.deleteLocalFiles();
+    if (fConf != null) {
+      fConf.deleteLocalFiles();
+    }
   }
 
   // Object on wait which MapEventsFetcherThread is going to wait.
@@ -890,25 +897,117 @@
     }
   }
     
-  public synchronized void shutdown() throws IOException {
-    shuttingDown = true;
-    close();
-    if (this.server != null) {
-      try {
-        LOG.info("Shutting down StatusHttpServer");
-        this.server.stop();
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException for any problem.
+   */
+  @Override
+  protected synchronized void innerStart() throws IOException {
+    JobConf conf = fConf;
+    maxCurrentMapTasks = conf.getInt(
+            "mapred.tasktracker.map.tasks.maximum", 2);
+    maxCurrentReduceTasks = conf.getInt(
+            "mapred.tasktracker.reduce.tasks.maximum", 2);
+    this.jobTrackAddr = JobTracker.getAddress(conf);
+    String infoAddr =
+            NetUtils.getServerAddress(conf,
+                    "tasktracker.http.bindAddress",
+                    "tasktracker.http.port",
+                    "mapred.task.tracker.http.address");
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+    String httpBindAddress = infoSocAddr.getHostName();
+    int port = infoSocAddr.getPort();
+    this.server = new HttpServer("task", httpBindAddress, port,
+            port == 0, conf);
+    workerThreads = conf.getInt("tasktracker.http.threads", 40);
+    this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
+    server.setThreads(1, workerThreads);
+    // let the jsp pages get to the task tracker, config, and other relevant
+    // objects
+    FileSystem local = FileSystem.getLocal(conf);
+    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+    server.setAttribute("task.tracker", this);
+    server.setAttribute("local.file.system", local);
+    server.setAttribute("conf", conf);
+    server.setAttribute("log", LOG);
+    server.setAttribute("localDirAllocator", localDirAllocator);
+    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+    server.addInternalServlet("mapOutput", "/mapOutput",
+            MapOutputServlet.class);
+    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+    server.start();
+    this.httpPort = server.getPort();
+    initialize();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException       for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (server == null || !server.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + httpPort));
+    }
+    if (taskReportServer == null) {
+      status.addThrowable(
+              new IOException("TaskTracker Report Server is not running on "
+              + taskReportAddress));
+    }
+  }
+  
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    synchronized (this) {
+      shuttingDown = true;
+      closeTaskTracker();
+      if (this.server != null) {
+        try {
+          LOG.info("Shutting down StatusHttpServer");
+          this.server.stop();
       } catch (Exception e) {
         LOG.warn("Exception shutting down TaskTracker", e);
+        }
       }
+      stopCleanupThreads();
     }
   }
+
+  /**
+   * A shutdown request triggers termination
+   * @throws IOException when errors happen during termination
+   */
+  public synchronized void shutdown() throws IOException {
+    close();
+  }
+
   /**
    * Close down the TaskTracker and all its components.  We must also shutdown
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
+   * @throws IOException when errors happen during shutdown
    */
-  public synchronized void close() throws IOException {
+  public synchronized void closeTaskTracker() throws IOException {
+    if (!running) {
+      //this operation is a no-op when not already running
+      return;
+    }
+    running = false;
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -920,27 +1019,37 @@
       tip.jobHasFinished(false);
     }
     
-    this.running = false;
-        
     // Clear local storage
     cleanupStorage();
         
     // Shutdown the fetcher thread
-    this.mapEventsFetcher.interrupt();
+    if (mapEventsFetcher != null) {
+      mapEventsFetcher.interrupt();
+    }
     
     //stop the launchers
-    this.mapLauncher.interrupt();
-    this.reduceLauncher.interrupt();
-    
-    jvmManager.stop();
+    if (mapLauncher != null) {
+      mapLauncher.cleanTaskQueue();
+      mapLauncher.interrupt();
+    }
+    if (reduceLauncher != null) {
+      reduceLauncher.cleanTaskQueue();
+      reduceLauncher.interrupt();
+    }
     
+    if (jvmManager != null) {
+      jvmManager.stop();
+    }
+      
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
 
     // wait for the fetcher thread to exit
     for (boolean done = false; !done; ) {
       try {
-        this.mapEventsFetcher.join();
+        if(mapEventsFetcher != null) {
+          mapEventsFetcher.join();
+        }
         done = true;
       } catch (InterruptedException e) {
       }
@@ -953,52 +1062,54 @@
   }
 
   /**
-   * Start with the local machine name, and the default JobTracker
+   * Create and start a task tracker.
+   * Subclasses must not subclass this constructor, as it may
+   * call their initialisation/startup methods before the constructor
+   * is complete.
+   * It is here for backwards compatibility.
+   * @param conf configuration
+   * @throws IOException for problems on startup
    */
   public TaskTracker(JobConf conf) throws IOException {
+    this(conf, true);
+  }
+
+  /**
+   * Subclasses should extend this constructor and pass start=false to the
+   * superclass to avoid race conditions in constructors and threads.
+   * @param conf configuration
+   * @param start flag to set to true to start the daemon
+   * @throws IOException for problems on startup
+   */
+  protected TaskTracker(JobConf conf, boolean start) throws IOException {
+    super(conf);
     fConf = conf;
-    maxCurrentMapTasks = conf.getInt(
-                  "mapred.tasktracker.map.tasks.maximum", 2);
-    maxCurrentReduceTasks = conf.getInt(
-                  "mapred.tasktracker.reduce.tasks.maximum", 2);
-    this.jobTrackAddr = JobTracker.getAddress(conf);
-    String infoAddr = 
-      NetUtils.getServerAddress(conf,
-                                "tasktracker.http.bindAddress", 
-                                "tasktracker.http.port",
-                                "mapred.task.tracker.http.address");
-    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
-    String httpBindAddress = infoSocAddr.getHostName();
-    int httpPort = infoSocAddr.getPort();
-    this.server = new HttpServer("task", httpBindAddress, httpPort,
-        httpPort == 0, conf);
-    workerThreads = conf.getInt("tasktracker.http.threads", 40);
-    this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
-    server.setThreads(1, workerThreads);
-    // let the jsp pages get to the task tracker, config, and other relevant
-    // objects
-    FileSystem local = FileSystem.getLocal(conf);
-    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
-    server.setAttribute("task.tracker", this);
-    server.setAttribute("local.file.system", local);
-    server.setAttribute("conf", conf);
-    server.setAttribute("log", LOG);
-    server.setAttribute("localDirAllocator", localDirAllocator);
-    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
-    server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
-    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
-    server.start();
-    this.httpPort = server.getPort();
-    initialize();
+    //for backwards compatibility, the task tracker starts up unless told not
+    //to. Subclasses should be very cautious about having their superclass
+    //do that as subclassed methods can be invoked before the class is fully
+    //configured
+    if(start) {
+      deploy(this);
+    }
   }
 
   private void startCleanupThreads() throws IOException {
-    taskCleanupThread.setDaemon(true);
+    taskCleanupThread = new TaskCleanupThread();
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
   }
   
   /**
+   * Tell the cleanup threads that they should end themselves 
+   */
+  private void stopCleanupThreads() {
+    if (taskCleanupThread != null) {
+      taskCleanupThread.terminate();
+      taskCleanupThread = null;
+    }
+  }
+  
+  /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
    */
@@ -1045,6 +1156,7 @@
    */
   State offerService() throws Exception {
     long lastHeartbeat = 0;
+    boolean restartingService = true;
 
     while (running && !shuttingDown) {
       try {
@@ -1060,6 +1172,7 @@
         // 1. Verify the buildVersion
         // 2. Get the system directory & filesystem
         if(justInited) {
+          LOG.debug("Checking build version with JobTracker");
           String jobTrackerBV = jobClient.getBuildVersion();
           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
             String msg = "Shutting down. Incompatible buildVersion." +
@@ -1069,7 +1182,7 @@
             try {
               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
             } catch(Exception e ) {
-              LOG.info("Problem reporting to jobtracker: " + e);
+              LOG.info("Problem reporting to jobtracker: " + e, e);
             }
             return State.DENIED;
           }
@@ -1080,6 +1193,9 @@
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("System directory is " + systemDirectory);
+          }
         }
         
         // Send the heartbeat and process the jobtracker's directives
@@ -1132,6 +1248,15 @@
           return State.STALE;
         }
             
+        //At this point the job tracker is present and compatible,
+        //so the service is coming up.
+        //It is time to declare it as such
+        if (restartingService) {
+          //declare the service as live.
+          enterLiveState();
+          restartingService = false;
+        }
+            
         // resetting heartbeat interval from the response.
         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
         justStarted = false;
@@ -1157,15 +1282,16 @@
             
         //we've cleaned up, resume normal operation
         if (!acceptNewTasks && isIdle()) {
+          LOG.info("ready to accept new tasks again");
           acceptNewTasks=true;
         }
       } catch (InterruptedException ie) {
         LOG.info("Interrupted. Closing down.");
         return State.INTERRUPTED;
       } catch (DiskErrorException de) {
-        String msg = "Exiting task tracker for disk error:\n" +
+          String msg = "Exiting task tracker for disk error:\n" +
           StringUtils.stringifyException(de);
-        LOG.error(msg);
+          LOG.error(msg, de);
         synchronized (this) {
           jobClient.reportTaskTrackerError(taskTrackerName, 
                                            "DiskErrorException", msg);
@@ -1177,10 +1303,9 @@
           LOG.info("Tasktracker disallowed by JobTracker.");
           return State.DENIED;
         }
-      } catch (Exception except) {
-        String msg = "Caught exception: " + 
-          StringUtils.stringifyException(except);
-        LOG.error(msg);
+      } catch (IOException except) {
+        String msg = "Caught exception: " + except;
+        LOG.error(msg, except);
       }
     }
 
@@ -1491,6 +1616,7 @@
       localMinSpaceKill = minSpaceKill;  
     }
     if (!enoughFreeSpace(localMinSpaceKill)) {
+      LOG.info("Tasktracker running out of space -not accepting new tasks");
       acceptNewTasks=false; 
       //we give up! do not accept new tasks until
       //all the ones running have finished and they're all cleared up
@@ -1752,7 +1878,7 @@
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
-      LOG.warn(msg);
+      LOG.warn(msg, e);
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);
@@ -1812,17 +1938,22 @@
               if (!shuttingDown) {
                 LOG.info("Lost connection to JobTracker [" +
                          jobTrackAddr + "].  Retrying...", ex);
+                //enter the started state; we are no longer live
+                enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
                 try {
                   Thread.sleep(5000);
                 } catch (InterruptedException ie) {
+                  LOG.info("Interrupted while waiting for the job tracker", ie);
                 }
               }
             }
           }
         } finally {
-          close();
+          closeTaskTracker();
+        }
+        if (shuttingDown) {
+          return;
         }
-        if (shuttingDown) { return; }
         LOG.warn("Reinitializing local state");
         initialize();
       }
@@ -1830,8 +1961,7 @@
         shutdown();
       }
     } catch (IOException iex) {
-      LOG.error("Got fatal exception while reinitializing TaskTracker: " +
-                StringUtils.stringifyException(iex));
+      LOG.error("Got fatal exception while reinitializing TaskTracker: " + iex, iex);
       return;
     }
   }
@@ -2502,6 +2632,20 @@
           }
           String taskDir = getLocalTaskDir(task.getJobID().toString(),
                              taskId.toString(), task.isTaskCleanupTask());
+          CleanupQueue cleaner = directoryCleanupThread;
+          boolean cleanupThread = cleaner == null;
+          if (!cleanupThread) {
+            LOG.info("Cannot clean up: no directory cleanup thread");
+          }
+          if (taskDir == null) {
+            throw new IOException("taskDir==null");
+          }
+          if(localJobConf==null) {
+              throw new IOException("localJobConf==null");
+          }
+          if (defaultJobConf == null) {
+            throw new IOException("defaultJobConf==null");
+          }
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2785,7 +2929,17 @@
   String getName() {
     return taskTrackerName;
   }
-    
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return taskTrackerName != null ? taskTrackerName : "Task Tracker";
+  }
+
   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
                                           boolean sendCounters) {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -2902,7 +3056,9 @@
       // enable the server to track time spent waiting on locks
       ReflectionUtils.setContentionTracing
         (conf.getBoolean("tasktracker.contention.tracking", false));
-      new TaskTracker(conf).run();
+      TaskTracker tracker = new TaskTracker(conf, false);
+      Service.deploy(tracker);
+      tracker.run();
     } catch (Throwable e) {
       LOG.error("Can not start task tracker because "+
                 StringUtils.stringifyException(e));
@@ -3220,8 +3376,70 @@
       try {
         purgeTask(tip, wasFailure); // Marking it as failed/killed.
       } catch (IOException ioe) {
-        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
       }
     }
   }
+
+  /**
+   * Cleanup queue that can process actions to kill a job or task
+   */
+  private class TaskCleanupThread extends Daemon {
+
+    /**
+     * flag to halt work
+     */
+    private volatile boolean live = true;
+
+
+    /**
+     * Construct a daemon thread.
+     */
+    private TaskCleanupThread() {
+      setName("Task Tracker Task Cleanup Thread");
+    }
+
+    /**
+     * End the daemon. This is done by setting the live flag to false and
+     * interrupting ourselves.
+     */
+    public void terminate() {
+      live = false;
+      interrupt();
+    }
+
+    /**
+     * process task kill actions until told to stop being live.
+     */
+    public void run() {
+      LOG.debug("Task cleanup thread started");
+      while (live) {
+        try {
+          TaskTrackerAction action = tasksToCleanup.take();
+          if (action instanceof KillJobAction) {
+            purgeJob((KillJobAction) action);
+          } else if (action instanceof KillTaskAction) {
+            TaskInProgress tip;
+            KillTaskAction killAction = (KillTaskAction) action;
+            synchronized (TaskTracker.this) {
+              tip = tasks.get(killAction.getTaskID());
+            }
+            LOG.info("Received KillTaskAction for task: " +
+                    killAction.getTaskID());
+            purgeTask(tip, false);
+          } else {
+            LOG.error("Non-delete action given to cleanup thread: "
+                    + action);
+          }
+        } catch (InterruptedException except) {
+          //interrupted. this may have reset the live flag
+        } catch (Throwable except) {
+          LOG.warn("Exception in Cleanup thread: " + except,
+                  except);
+        }
+      }
+      LOG.debug("Task cleanup thread ending");
+    }
+  }
+
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Mar 12 17:50:03 2009
@@ -107,6 +107,15 @@
     return actionType;
   }
 
+  /**
+   * {@inheritDoc}
+   * @return the action type.
+   */
+  @Override
+  public String toString() {
+    return "TaskTrackerAction: " + actionType;
+  }
+
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeEnum(out, actionType);
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Mar 12 17:50:03 2009
@@ -301,6 +301,18 @@
   }  
   
   /**
+   * String value prints the basic status of the task tracker
+   * @return a string value for diagnostics
+   */
+  @Override
+  public String toString() {
+    return trackerName
+            + " at http://" + host + ":" + httpPort + "/"
+            + " current task count: " + taskReports.size()
+            + " failed task count: " + failures;
+  }
+  
+  /**
    * Return the {@link ResourceStatus} object configured with this
    * status.
    * 

Modified: hadoop/core/trunk/src/test/hadoop-site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hadoop-site.xml?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hadoop-site.xml (original)
+++ hadoop/core/trunk/src/test/hadoop-site.xml Thu Mar 12 17:50:03 2009
@@ -11,4 +11,10 @@
 
 
 
+<property>
+  <name>dfs.datanode.ipc.address</name>
+  <value>localhost:50020</value>
+  <description>address for datanodes is always the localhost. This makes for
+  a very fast test setup</description>
+</property>
 </configuration>

Modified: hadoop/core/trunk/src/test/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/log4j.properties?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/log4j.properties (original)
+++ hadoop/core/trunk/src/test/log4j.properties Thu Mar 12 17:50:03 2009
@@ -4,4 +4,16 @@
 log4j.threshhold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %x- %m%n
+
+#this is a logger that prints line numbers; it is unused but can be switched
+#on if desired
+log4j.appender.linenumbers=org.apache.log4j.ConsoleAppender
+log4j.appender.linenumbers.layout=org.apache.log4j.PatternLayout
+log4j.appender.linenumbers.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) %x - %m%n
+
+#log4j.logger.org.apache.hadoop=ERROR
+#log4j.logger.org.apache.hadoop.util.Service=DEBUG
+#log4j.logger.org.smartfrog.services.hadoop=DEBUG
+#log4j.logger.org.apache.hadoop.mapred=DEBUG
+#log4j.logger.org.apache.hadoop.ipc=DEBUG

Added: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java Thu Mar 12 17:50:03 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.conf;
+
+import junit.framework.TestCase;
+
+import java.util.Properties;
+
+/**
+ * Created 21-Jan-2009 13:42:36
+ */
+
+public class TestConfigurationSubclass extends TestCase {
+  private static final String EMPTY_CONFIGURATION_XML
+          = "/org/apache/hadoop/conf/empty-configuration.xml";
+
+
+  public void testGetProps() {
+    SubConf conf = new SubConf(true);
+    Properties properties = conf.getProperties();
+    assertNotNull("hadoop.tmp.dir is not set",
+            properties.getProperty("hadoop.tmp.dir"));
+  }
+
+  public void testReload() throws Throwable {
+    SubConf conf = new SubConf(true);
+    assertFalse(conf.isReloaded());
+    Configuration.addDefaultResource(EMPTY_CONFIGURATION_XML);
+    assertTrue(conf.isReloaded());
+    Properties properties = conf.getProperties();
+  }
+
+  public void testReloadNotQuiet() throws Throwable {
+    SubConf conf = new SubConf(true);
+    conf.setQuietMode(false);
+    assertFalse(conf.isReloaded());
+    conf.addResource("not-a-valid-resource");
+    assertTrue(conf.isReloaded());
+    try {
+      Properties properties = conf.getProperties();
+      fail("Should not have got here");
+    } catch (RuntimeException e) {
+      assertTrue(e.toString(),e.getMessage().contains("not found"));
+    }
+  }
+
+  private static class SubConf extends Configuration {
+
+    private boolean reloaded;
+
+    /**
+     * A new configuration where the behavior of reading from the default resources
+     * can be turned off.
+     *
+     * If the parameter {@code loadDefaults} is false, the new instance will not
+     * load resources from the default files.
+     *
+     * @param loadDefaults specifies whether to load from the default files
+     */
+    private SubConf(boolean loadDefaults) {
+      super(loadDefaults);
+    }
+
+    public Properties getProperties() {
+      return super.getProps();
+    }
+
+    /**
+     * {@inheritDoc}.
+     * Sets the reloaded flag.
+     */
+    @Override
+    public void reloadConfiguration() {
+      super.reloadConfiguration();
+      reloaded = true;
+    }
+
+    public boolean isReloaded() {
+      return reloaded;
+    }
+
+    public void setReloaded(boolean reloaded) {
+      this.reloaded = reloaded;
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java Thu Mar 12 17:50:03 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.conf;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.net.NetUtils;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Created 22-Jan-2009 14:15:59
+ */
+
+public class TestGetServerAddress extends TestCase {
+  private Configuration conf;
+  private static final String ADDRESS_TUPLE = "addressTuple";
+  private static final String BIND_ADDRESS_PORT = "bindAddressPort";
+  private static final String BIND_ADDRESS_NAME = "bindAddressName";
+  private static final String NOT_A_HOST_PORT_PAIR = "Not a host:port pair: ";
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf = new Configuration();
+  }
+
+  private String lookup() {
+    String address = NetUtils.getServerAddress(conf,
+            BIND_ADDRESS_NAME,
+            BIND_ADDRESS_PORT,
+            ADDRESS_TUPLE);
+    assertNotNull("Null string for the server address", address);
+    return address;
+  }
+
+  private void setAddressTuple(String value) {
+    conf.set(ADDRESS_TUPLE, value);
+  }
+
+  private void setAddressPort(String value) {
+    conf.set(BIND_ADDRESS_PORT, value);
+  }
+
+  private void setAddressName(String value) {
+    conf.set(BIND_ADDRESS_NAME, value);
+  }
+
+  private void assertContains(String expected, Throwable throwable) {
+    assertContains(expected, throwable.getMessage());
+  }
+
+    private void assertContains(String expected, String value) {
+    assertNotNull("Expected " + expected + " got null string", value);
+    assertTrue("No \"" + expected + "\" in \"" + value + "\"",
+            value.contains(expected));
+  }
+
+  private void expectLookupFailure(String exceptionText) {
+    try {
+      String address = lookup();
+      fail("Expected an an exception, got " + address);
+    } catch (IllegalArgumentException expected) {
+      assertContains(exceptionText, expected);
+    }
+  }
+
+  public void testNoValues() {
+    expectLookupFailure(ADDRESS_TUPLE);
+  }
+
+  public void testPortHasPriority() throws Throwable {
+    setAddressTuple("name:8080");
+    String port = "1234";
+    setAddressPort(port);
+    assertContains(port, lookup());
+  }
+
+
+  public void testNameHasPriority() throws Throwable {
+    setAddressTuple("name:8080");
+    String name = "localhost";
+    setAddressName(name);
+    assertContains(name, lookup());
+  }
+
+  public void testNameAndPort() throws Throwable {
+    setAddressName("name");
+    setAddressPort("8080");
+    expectLookupFailure("No value for addressTuple");
+  }
+
+  public void testNameNoPort() throws Throwable {
+    setAddressName("name");
+    expectLookupFailure("No value for addressTuple");
+  }
+
+
+  public void testEmptyString() throws Throwable {
+    setAddressTuple("");
+    String address = lookup();
+    try {
+      InetSocketAddress addr = NetUtils.createSocketAddr(address, -1);
+      fail("Expected an an exception, got " + address + " and hence " + addr);
+    } catch (RuntimeException expected) {
+      assertContains(NOT_A_HOST_PORT_PAIR, expected);
+    }
+  }
+
+
+  public void testAddressIsNotATuple() {
+    setAddressTuple("localhost");
+    String address = lookup();
+    try {
+      InetSocketAddress addr = NetUtils.createSocketAddr(address, -1);
+      fail("Expected an an exception, got " + address + " and hence " + addr);
+    } catch (RuntimeException expected) {
+      assertContains(NOT_A_HOST_PORT_PAIR, expected);
+      assertContains("localhost", expected);
+    }
+  }
+
+  public void testAddressIsATriple() {
+    setAddressTuple("localhost:8080:1234");
+    String address = lookup();
+    try {
+      InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+      fail("Expected an an exception, got " + address + " and hence " + addr);
+    } catch (NumberFormatException expected) {
+      assertContains("8080:1234", expected);
+    }
+  }
+
+  public void testBindAddressIsATriple() {
+    setAddressPort("8080:1234");
+    setAddressTuple("localhost:8");
+    //non-tuples are not picked up when the socket is created
+    String address = lookup();
+    try {
+      InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+      fail("Expected an an exception, got " + address + " and hence " + addr);
+    } catch (NumberFormatException expected) {
+      assertContains("8080:1234", expected);
+    }
+  }
+
+  public void testAddressPortIsInvalid() {
+    setAddressTuple("localhost:twelve");
+    String address = lookup();
+    try {
+      InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+      fail("Expected an an exception, got " + address + " and hence " + addr);
+    } catch (NumberFormatException expected) {
+      assertContains("twelve", expected);
+    }
+  }
+
+  public void testAddressPortIsSigned() {
+    setAddressPort("-23");
+    setAddressTuple("localhost:8");
+    String address = lookup();
+    try {
+      InetSocketAddress addr = NetUtils.createSocketAddr(address, 0);
+      fail("Expected an an exception, got " + address + " and hence " + addr);
+    } catch (IllegalArgumentException expected) {
+      assertContains("port out of range:-23", expected);
+    }
+  }
+
+
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Thu Mar 12 17:50:03 2009
@@ -79,7 +79,7 @@
 
     FileOutputFormat.setOutputPath(conf, outDir);
 
-    JobClient.runJob(conf);
+    runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(outDir,
@@ -100,4 +100,14 @@
 
   }
 
+  /**
+   * Run a job, getting the timeout from a system property
+   * @param conf job configuration to use
+   * @throws IOException for any problem, including job failure
+   */
+  private void runJob(JobConf conf) throws IOException {
+    long timeout = Long.getLong("test.jobclient.timeout", 0);
+    JobClient.runJob(conf, timeout);
+  }
+
 }
\ No newline at end of file

Added: hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml Thu Mar 12 17:50:03 2009
@@ -0,0 +1,4 @@
+<?xml version="1.0"?>
+<configuration>
+</configuration>
+

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Mar 12 17:50:03 2009
@@ -64,6 +64,15 @@
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
     .toString().replace(' ', '+');
+  private MiniDFSCluster cluster;
+
+  /**
+   * terminate any non-null cluster
+   */
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    MiniDFSCluster.close(cluster);
+  }
 
   /** class MyFile contains enough information to recreate the contents of
    * a single file.
@@ -269,8 +278,6 @@
   /** copy files from dfs file system to dfs file system */
   public void testCopyFromDfsToDfs() throws Exception {
     String namenode = null;
-    MiniDFSCluster cluster = null;
-    try {
       Configuration conf = new Configuration();
       cluster = new MiniDFSCluster(conf, 2, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
@@ -291,15 +298,10 @@
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
   
   /** copy files from local file system to dfs file system */
   public void testCopyFromLocalToDfs() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
       Configuration conf = new Configuration();
       cluster = new MiniDFSCluster(conf, 1, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
@@ -319,15 +321,10 @@
         deldir(hdfs, "/logs");
         deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
 
   /** copy files from dfs file system to local file system */
   public void testCopyFromDfsToLocal() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
       Configuration conf = new Configuration();
       final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
       cluster = new MiniDFSCluster(conf, 1, true, null);
@@ -348,16 +345,11 @@
         deldir(hdfs, "/logs");
         deldir(hdfs, "/srcdat");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
 
   public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      Configuration conf = new Configuration();
-      cluster = new MiniDFSCluster(conf, 2, true, null);
+    Configuration conf = new Configuration();
+    cluster = new MiniDFSCluster(conf, 2, true, null);
       final FileSystem hdfs = cluster.getFileSystem();
       final String namenode = hdfs.getUri().toString();
       if (namenode.startsWith("hdfs://")) {
@@ -408,9 +400,7 @@
         deldir(hdfs, "/srcdat");
         deldir(hdfs, "/logs");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
+
   }
 
   public void testCopyDuplication() throws Exception {
@@ -486,11 +476,9 @@
 
   public void testPreserveOption() throws Exception {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = null;
-    try {
-      cluster = new MiniDFSCluster(conf, 2, true, null);
-      String nnUri = FileSystem.getDefaultUri(conf).toString();
-      FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+    cluster = new MiniDFSCluster(conf, 2, true, null);
+    String nnUri = FileSystem.getDefaultUri(conf).toString();
+    FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
 
       {//test preserving user
         MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
@@ -551,19 +539,15 @@
         deldir(fs, "/destdat");
         deldir(fs, "/srcdat");
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
     }
-  }
 
   public void testMapCount() throws Exception {
     String namenode = null;
-    MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     try {
       Configuration conf = new Configuration();
-      dfs = new MiniDFSCluster(conf, 3, true, null);
-      FileSystem fs = dfs.getFileSystem();
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      FileSystem fs = cluster.getFileSystem();
       final FsShell shell = new FsShell(conf);
       namenode = fs.getUri().toString();
       mr = new MiniMRCluster(3, namenode, 1);
@@ -604,8 +588,7 @@
       assertTrue("Unexpected map count, logs.length=" + logs.length,
           logs.length == 2);
     } finally {
-      if (dfs != null) { dfs.shutdown(); }
-      if (mr != null) { mr.shutdown(); }
+      MiniMRCluster.close(mr);
     }
   }
 
@@ -714,9 +697,7 @@
   }
 
   public void testHftpAccessControl() throws Exception {
-    MiniDFSCluster cluster = null;
-    try {
-      final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); 
+      final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
       final UnixUserGroupInformation USER_UGI = createUGI("user", false); 
 
       //start cluster by DFS_UGI
@@ -750,9 +731,6 @@
         fs.setPermission(srcrootpath, new FsPermission((short)0));
         assertEquals(-3, ToolRunner.run(distcp, args));
       }
-    } finally {
-      if (cluster != null) { cluster.shutdown(); }
-    }
   }
 
   /** test -delete */

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java Thu Mar 12 17:50:03 2009
@@ -65,25 +65,25 @@
   private static final long MEGA = 1024 * 1024;
   private static final int SEEKS_PER_FILE = 4;
 
-  private static String ROOT = System.getProperty("test.build.data","fs_test");
-  private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
-  private static Path WRITE_DIR = new Path(ROOT, "fs_write");
-  private static Path READ_DIR = new Path(ROOT, "fs_read");
-  private static Path DATA_DIR = new Path(ROOT, "fs_data");
+  private static final String ROOT = System.getProperty("test.build.data","fs_test");
+  private static final Path CONTROL_DIR = new Path(ROOT, "fs_control");
+  private static final Path WRITE_DIR = new Path(ROOT, "fs_write");
+  private static final Path READ_DIR = new Path(ROOT, "fs_read");
+  private static final Path DATA_DIR = new Path(ROOT, "fs_data");
 
   public void testFs() throws Exception {
-    testFs(10 * MEGA, 100, 0);
+    createTestFs(10 * MEGA, 100, 0);
   }
 
-  public static void testFs(long megaBytes, int numFiles, long seed)
+  private static void createTestFs(long megaBytes, int numFiles, long seed)
     throws Exception {
 
     FileSystem fs = FileSystem.get(conf);
 
-    if (seed == 0)
+    if (seed == 0) {
       seed = new Random().nextLong();
-
-    LOG.info("seed = "+seed);
+      LOG.info("seed = " + seed);
+    }
 
     createControlFile(fs, megaBytes, numFiles, seed);
     writeTest(fs, false);
@@ -553,7 +553,7 @@
         }
       }
     } finally {
-      if (cluster != null) cluster.shutdown(); 
+      MiniDFSCluster.close(cluster);
     }
   }
   
@@ -563,30 +563,44 @@
     fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
   }
 
-  public void testFsClose() throws Exception {
-    {
-      Configuration conf = new Configuration();
-      new Path("file:///").getFileSystem(conf);
-      UnixUserGroupInformation.login(conf, true);
-      FileSystem.closeAll();
-    }
+  public void testCloseFileFS() throws Exception {
+    Configuration conf = new Configuration();
+    new Path("file:///").getFileSystem(conf);
+    UnixUserGroupInformation.login(conf, true);
+    FileSystem.closeAll();
+  }
 
-    {
+  public void testCloseHftpFS() throws Exception {
       Configuration conf = new Configuration();
       new Path("hftp://localhost:12345/").getFileSystem(conf);
       UnixUserGroupInformation.login(conf, true);
       FileSystem.closeAll();
-    }
+  }
 
-    {
+  public void testCloseHftpFSAltLogin() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
+    UnixUserGroupInformation.login(fs.getConf(), true);
+    FileSystem.closeAll();
+  }
+
+
+  public void testCloseHDFS() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
+      URI uri = cluster.getFileSystem().getUri();
+      FileSystem fs = FileSystem.get(uri, new Configuration());
+      checkPath(cluster, fs);
       Configuration conf = new Configuration();
-      FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
-      UnixUserGroupInformation.login(fs.getConf(), true);
+      new Path(uri.toString()).getFileSystem(conf);
+      UnixUserGroupInformation.login(conf, true);
       FileSystem.closeAll();
+    } finally {
+      MiniDFSCluster.close(cluster);
     }
   }
 
-
   public void testCacheKeysAreCaseInsensitive()
     throws Exception
   {