You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/06/05 06:13:20 UTC

svn commit: r663441 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MRConstants.java src/java/org/apache/hadoop/mapred/TaskRunner.java src/java/org/apache/hadoop/mapred/TaskTracker.java

Author: acmurthy
Date: Wed Jun  4 21:13:19 2008
New Revision: 663441

URL: http://svn.apache.org/viewvc?rev=663441&view=rev
Log:
HADOOP-2427. Ensure that the cwd of completed tasks is cleaned-up correctly on task-completion. Contributed by Amareshwari Sri Ramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663441&r1=663440&r2=663441&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun  4 21:13:19 2008
@@ -456,6 +456,9 @@
     HADOOP-3476. Code cleanup in fuse-dfs.
     (Peter Wyckoff via dhruba)
 
+    HADOOP-2427. Ensure that the cwd of completed tasks is cleaned-up
+    correctly on task-completion. (Amareshwari Sri Ramadasu via acmurthy) 
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?rev=663441&r1=663440&r2=663441&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Wed Jun  4 21:13:19 2008
@@ -65,4 +65,6 @@
    * Temporary directory name 
    */
   public static final String TEMP_DIR_NAME = "_temporary";
+  
+  public static final String WORKDIR = "work";
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=663441&r1=663440&r2=663441&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Jun  4 21:13:19 2008
@@ -107,7 +107,7 @@
                                 TaskTracker.getJobCacheSubdir() 
                                 + Path.SEPARATOR + t.getJobID() 
                                 + Path.SEPARATOR + t.getTaskID()
-                                + Path.SEPARATOR + "work",
+                                + Path.SEPARATOR + MRConstants.WORKDIR,
                                 conf). toString());
 
       URI[] archives = DistributedCache.getCacheArchives(conf);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=663441&r1=663440&r2=663441&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun  4 21:13:19 2008
@@ -729,7 +729,7 @@
       } catch (Throwable ie) {
         tip.taskStatus.setRunState(TaskStatus.State.FAILED);
         try {
-          tip.cleanup();
+          tip.cleanup(true);
         } catch (Throwable ie2) {
           // Ignore it, we are just trying to cleanup.
         }
@@ -1417,7 +1417,7 @@
                          TaskTracker.getJobCacheSubdir() 
                          + Path.SEPARATOR + task.getJobID() 
                          + Path.SEPARATOR + task.getTaskID()
-                         + Path.SEPARATOR + "work",
+                         + Path.SEPARATOR + MRConstants.WORKDIR,
                          defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
@@ -1652,7 +1652,7 @@
                                      TaskTracker.getJobCacheSubdir() 
                                      + Path.SEPARATOR + task.getJobID() 
                                      + Path.SEPARATOR + task.getTaskID()
-                                     + Path.SEPARATOR + "work",
+                                     + Path.SEPARATOR + MRConstants.WORKDIR,
                                      localJobConf). toString());
               } catch (IOException e) {
                 LOG.warn("Working Directory of the task " + task.getTaskID() +
@@ -1721,12 +1721,13 @@
       // later on to downstream job processing.
       //
       if (needCleanup) {
-        try {
-          removeTaskFromJob(task.getJobID(), this);
-          cleanup();
-        } catch (IOException ie) {
-        }
+        removeTaskFromJob(task.getJobID(), this);
+      }
+      try {
+        cleanup(needCleanup);
+      } catch (IOException ie) {
       }
+
     }
   
 
@@ -1824,7 +1825,7 @@
       }
       
       // Cleanup on the finished task
-      cleanup();
+      cleanup(true);
     }
 
     /**
@@ -1878,12 +1879,18 @@
      * Any calls to cleanup should not lock the tip first.
      * cleanup does the right thing- updates tasks in Tasktracker
      * by locking tasktracker first and then locks the tip.
+     * 
+     * if needCleanup is true, the whole task directory is cleaned up.
+     * otherwise the current working directory of the task 
+     * i.e. <taskid>/work is cleaned up.
      */
-    void cleanup() throws IOException {
+    void cleanup(boolean needCleanup) throws IOException {
       TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
       synchronized (TaskTracker.this) {
-        tasks.remove(taskId);
+        if (needCleanup) {
+          tasks.remove(taskId);
+        }
         synchronized (this){
           if (alwaysKeepTaskFiles ||
               (taskStatus.getRunState() == TaskStatus.State.FAILED && 
@@ -1894,13 +1901,17 @@
       }
       synchronized (this) {
         try {
-          if (runner != null) {
-            runner.close();
+          String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
+                           + task.getJobID() + Path.SEPARATOR + taskId;
+          if (needCleanup) {
+            if (runner != null) {
+              runner.close();
+            }
+            defaultJobConf.deleteLocalFiles(taskDir);
+          } else {
+            defaultJobConf.deleteLocalFiles(taskDir + Path.SEPARATOR + 
+                                            MRConstants.WORKDIR);
           }
-          defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
-                                          JOBCACHE + Path.SEPARATOR + 
-                                          task.getJobID() + 
-                                          Path.SEPARATOR + taskId);
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: " + 
                    StringUtils.stringifyException(ie));