You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/19 18:38:45 UTC

svn commit: r669549 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Author: ddas
Date: Thu Jun 19 09:38:45 2008
New Revision: 669549

URL: http://svn.apache.org/viewvc?rev=669549&view=rev
Log:
Merge -r 669546:669547 from trunk onto 0.18 branch. Fixes HADOOP-3546.

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=669549&r1=669548&r2=669549&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Thu Jun 19 09:38:45 2008
@@ -636,6 +636,9 @@
     HADOOP-3534. Log IOExceptions that happen in closing the name
     system when the NameNode shuts down. (Tsz Wo (Nicholas) Sze via omalley)
 
+    HADOOP-3546. TaskTracker re-initialization gets stuck in cleaning up.
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=669549&r1=669548&r2=669549&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Jun 19 09:38:45 2008
@@ -297,9 +297,6 @@
         public void run() {
           while (true) {
             try {
-              if (tasksToCleanup.isEmpty() && !isRunning()) {
-                break;
-              }
               TaskTrackerAction action = tasksToCleanup.take();
               if (action instanceof KillJobAction) {
                 purgeJob((KillJobAction) action);
@@ -322,10 +319,6 @@
           }
         }
       }, "taskCleanup");
-  {
-    taskCleanupThread.setDaemon(true);
-    taskCleanupThread.start();
-  }
     
   private RunningJob addTaskToJob(JobID jobId, 
                                   Path localJobFile,
@@ -398,12 +391,9 @@
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
     }
  
-    directoryCleanupThread = new CleanupQueue(fConf);
-    directoryCleanupThread.start();
-
     //check local disk
     checkLocalDirs(this.fConf.getLocalDirs());
-    directoryCleanupThread.addToQueue(getLocalFiles(fConf, SUBDIR));
+    fConf.deleteLocalFiles(SUBDIR);
 
     // Clear out state tables
     this.tasks.clear();
@@ -458,7 +448,6 @@
                        InterTrackerProtocol.versionID, 
                        jobTrackAddr, this.fConf);
         
-    this.running = true;
     // start the thread that will fetch map task completion events
     this.mapEventsFetcher = new MapEventsFetcherThread();
     mapEventsFetcher.setDaemon(true);
@@ -801,28 +790,6 @@
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
     
-    // shutdown cleanup threads.
-    if (this.taskCleanupThread != null 
-         && this.taskCleanupThread.isAlive()) {
-      LOG.info("Stopping task cleanup thread");
-      this.taskCleanupThread.interrupt();
-      try {
-        this.taskCleanupThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-    if (this.directoryCleanupThread != null 
-         && this.directoryCleanupThread.isAlive()) {
-      LOG.info("Stopping directory cleanup thread");
-      this.directoryCleanupThread.interrupt();
-      try {
-        this.directoryCleanupThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
   }
@@ -867,6 +834,14 @@
     initialize();
   }
 
+  private void startCleanupThreads() throws IOException {
+    taskCleanupThread.setDaemon(true);
+    taskCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue(originalConf);
+    directoryCleanupThread.setDaemon(true);
+    directoryCleanupThread.start();
+  }
+  
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
@@ -1355,6 +1330,8 @@
    */
   public void run() {
     try {
+      startCleanupThreads();
+      this.running = true;
       boolean denied = false;
       while (running && !shuttingDown && !denied) {
         boolean staleState = false;
@@ -2398,14 +2375,6 @@
   }
 
   /**
-   * True if task tracker is not shutting down.
-   * @return running
-   */
-  public boolean isRunning() {
-    return !shuttingDown;
-  }
-  
-  /**
    * This class is used in TaskTracker's Jetty to serve the map outputs
    * to other nodes.
    */
@@ -2561,7 +2530,7 @@
   }
 
   // cleanup queue which deletes files/directories of the paths queued up.
-  private class CleanupQueue extends Thread {
+  private static class CleanupQueue extends Thread {
     private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
     private JobConf conf;
     
@@ -2585,9 +2554,6 @@
       Path path = null;
       while (true) {
         try {
-          if (queue.isEmpty() && !isRunning()) {
-            break;
-          }
           path = queue.take();
           // delete the path.
           FileSystem fs = path.getFileSystem(conf);