You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/12/28 14:08:05 UTC

svn commit: r894165 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/securi...

Author: yhemanth
Date: Mon Dec 28 13:08:04 2009
New Revision: 894165

URL: http://svn.apache.org/viewvc?rev=894165&view=rev
Log:
MAPREDUCE-896. Enhance tasktracker to cleanup files that might have been created by user tasks with non-writable permissions. Contributed by Ravi Gummadi.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/c++/task-controller/main.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CleanupQueue.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Dec 28 13:08:04 2009
@@ -93,6 +93,10 @@
     MAPREDUCE-1250. Refactors the JobToken to use Common's Token interface.
     (Kan Zhang via ddas)
 
+    MAPREDUCE-896. Enhance tasktracker to cleanup files that might have
+    been created by user tasks with non-writable permissions.
+    (Ravi Gummadi via yhemanth)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/main.c?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/main.c Mon Dec 28 13:08:04 2009
@@ -58,6 +58,7 @@
       NULL, 0 } };
 
   const char* log_file = NULL;
+  char * dir_to_be_deleted = NULL;
   int conf_dir_len = 0;
 
   //Minimum number of arguments required to run the task-controller
@@ -154,6 +155,13 @@
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGQUIT);
     break;
+  case ENABLE_TASK_FOR_CLEANUP:
+    tt_root = argv[optind++];
+    job_id = argv[optind++];
+    dir_to_be_deleted = argv[optind++];
+    exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id,
+                                        dir_to_be_deleted);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Mon Dec 28 13:08:04 2009
@@ -198,6 +198,17 @@
       attempt_dir);
 }
 
+/*
+ * Builds the full path of the dir(localTaskDir or localWorkDir)
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work)
+ */
+char *get_task_dir_path(const char *tt_root, const char *user,
+                        const char *jobid, const char *dir_to_be_deleted) {
+  return concatenate(TT_LOCAL_TASK_DIR_PATTERN, "task_dir_full_path", 4,
+                     tt_root, user, jobid, dir_to_be_deleted);
+}
+
 /**
  * Get the log directory for the given attempt.
  */
@@ -219,17 +230,17 @@
  * launcher file resolve to one and same. This is done so as to avoid
  * security pitfalls because of relative path components in the file name.
  */
-int check_task_launcher_path(char *path) {
+int check_path_for_relative_components(char *path) {
   char * resolved_path = (char *) canonicalize_file_name(path);
   if (resolved_path == NULL) {
     fprintf(LOGFILE,
-        "Error resolving the task launcher file path: %s. Passed path: %s\n",
+        "Error resolving the path: %s. Passed path: %s\n",
         strerror(errno), path);
     return ERROR_RESOLVING_FILE_PATH;
   }
   if (strcmp(resolved_path, path) != 0) {
     fprintf(LOGFILE,
-        "Relative path components in the file path: %s. Resolved path: %s\n",
+        "Relative path components in the path: %s. Resolved path: %s\n",
         path, resolved_path);
     free(resolved_path);
     return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
@@ -256,20 +267,23 @@
 static int change_mode(const char *path, mode_t mode) {
   int exit_code = chmod(path, mode);
   if (exit_code != 0) {
-    fprintf(LOGFILE, "chown %d of path %s failed: %s.\n", mode, path,
+    fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path,
         strerror(errno));
   }
   return exit_code;
 }
 
 /**
- * Function to secure the given path. It does the following recursively:
+ * Function to change permissions of the given path. It does the following
+ * recursively:
  *    1) changes the owner/group of the paths to the passed owner/group
  *    2) changes the file permission to the passed file_mode and directory
  *       permission to the passed dir_mode
+ *
+ * should_check_ownership : boolean to enable checking of ownership of each path
  */
 static int secure_path(const char *path, uid_t uid, gid_t gid,
-    mode_t file_mode, mode_t dir_mode) {
+    mode_t file_mode, mode_t dir_mode, int should_check_ownership) {
   FTS *tree = NULL; // the file hierarchy
   FTSENT *entry = NULL; // a file in the hierarchy
   char *paths[] = { (char *) path, NULL };//array needs to be NULL-terminated
@@ -362,7 +376,8 @@
     if (!process_path) {
       continue;
     }
-    if (compare_ownership(uid, gid, entry->fts_path) == 0) {
+    if (should_check_ownership &&
+          (compare_ownership(uid, gid, entry->fts_path) == 0)) {
       // already set proper permissions.
       // This might happen with distributed cache.
 #ifdef DEBUG
@@ -374,7 +389,7 @@
       continue;
     }
 
-    if (check_ownership(entry->fts_path) != 0) {
+    if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
       fprintf(LOGFILE,
           "Invalid file path. %s not user/group owned by the tasktracker.\n",
           entry->fts_path);
@@ -467,8 +482,9 @@
         free(job_dir);
         break;
       }
-    } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid,
-        S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+    } else if (secure_path(attempt_dir, user_detail->pw_uid,
+               tasktracker_gid, S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG,
+               1) != 0) {
       // No setgid on files and setgid on dirs, 770
       fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir);
       failed = 1;
@@ -527,8 +543,8 @@
   }
 
   gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-  if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU
-      | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+  if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid,
+      S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
     // setgid on dirs but not files, 770. As of now, there are no files though
     fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
     return -1;
@@ -640,9 +656,9 @@
         free(user_dir);
         break;
       }
-    } else if (secure_path(user_dir, user_detail->pw_uid, tasktracker_gid,
-        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
-        != 0) {
+    } else if (secure_path(user_dir, user_detail->pw_uid,
+        tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR |
+                         S_IXUSR | S_IRWXG, 1) != 0) {
       // No setgid on files and setgid on dirs, 570
       fprintf(LOGFILE, "Failed to secure the user_dir %s\n",
           user_dir);
@@ -722,7 +738,7 @@
         break;
       }
     } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid,
-        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG, 1)
         != 0) {
       // No setgid on files and setgid on dirs, 570
       fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir);
@@ -848,7 +864,7 @@
       }
     } else if (secure_path(distcache_dir, user_detail->pw_uid,
         tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
-            | S_IXUSR | S_IRWXG) != 0) {
+            | S_IXUSR | S_IRWXG, 1) != 0) {
       // No setgid on files and setgid on dirs, 570
       fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
           distcache_dir);
@@ -981,7 +997,7 @@
   }
 
   errno = 0;
-  exit_code = check_task_launcher_path(task_script_path);
+  exit_code = check_path_for_relative_components(task_script_path);
   if(exit_code != 0) {
     goto cleanup;
   }
@@ -1078,3 +1094,60 @@
   cleanup();
   return 0;
 }
+
+/**
+ * Enables the path for deletion by changing the owner, group and permissions
+ * of the specified path and all the files/directories in the path recursively.
+ *     *  sudo chown user:mapred -R full_path
+ *     *  sudo chmod 2770 -R full_path
+ * Before changing permissions, makes sure that the given path doesn't contain
+ * any relative components.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ */
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+           const char *jobid, const char *dir_to_be_deleted) {
+  int exit_code = 0;
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+  char * full_path = NULL;
+  if (check_tt_root(tt_root) < 0) {
+    fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+ 
+  full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted);
+  if (full_path == NULL) {
+    fprintf(LOGFILE,
+            "Could not build the full path. Not deleting the dir %s\n",
+            dir_to_be_deleted);
+    exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+  }
+     // Make sure that the path given is not having any relative components
+  else if ((exit_code = check_path_for_relative_components(full_path)) != 0) {
+    fprintf(LOGFILE,
+    "Not changing permissions. Path may contain relative components.\n",
+         full_path);
+  }
+  else if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+    exit_code = INVALID_USER_NAME;
+  }
+  else if (exit_code = secure_path(full_path, user_detail->pw_uid,
+               tasktracker_gid,
+               S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 0) != 0) {
+    // No setgid on files and setgid on dirs, 770.
+    // set 770 permissions for user, TTgroup for all files/directories in
+    // 'full_path' recursively sothat deletion of path by TaskTracker succeeds.
+
+    fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path);
+  }
+
+  if (full_path != NULL) {
+    free(full_path);
+  }
+  // free configurations
+  cleanup();
+  return exit_code;
+}

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Mon Dec 28 13:08:04 2009
@@ -46,6 +46,7 @@
   KILL_TASK_JVM,
   RUN_DEBUG_SCRIPT,
   SIGQUIT_TASK_JVM,
+  ENABLE_TASK_FOR_CLEANUP
 };
 
 enum errorcodes {
@@ -71,6 +72,7 @@
   INITIALIZE_USER_FAILED, //20
   UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
   INVALID_CONF_DIR, //22
+  UNABLE_TO_BUILD_PATH //23
 };
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -87,6 +89,8 @@
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 
+#define TT_LOCAL_TASK_DIR_PATTERN    "%s/taskTracker/%s/jobcache/%s/%s"
+
 #define TT_SYS_DIR_KEY "mapreduce.cluster.local.dir"
 
 #define TT_LOG_DIR_KEY "hadoop.log.dir"
@@ -116,6 +120,9 @@
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+                            const char *jobid, const char *dir_to_be_deleted);
+
 int prepare_attempt_directory(const char *attempt_dir, const char *user);
 
 // The following functions are exposed for testing

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CleanupQueue.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CleanupQueue.java Mon Dec 28 13:08:04 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -38,7 +39,7 @@
    * paths(directories/files) in a separate thread. This constructor creates a
    * clean-up thread and also starts it as a daemon. Callers can instantiate one
    * CleanupQueue per JVM and can use it for deleting paths. Use
-   * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for
+   * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
    * deletion.
    */
   public CleanupQueue() {
@@ -49,22 +50,61 @@
     }
   }
   
-  public void addToQueue(FileSystem fs, Path...paths) {
-    cleanupThread.addToQueue(fs, paths);
+  /**
+   * Contains info related to the path of the file/dir to be deleted
+   */
+  static class PathDeletionContext {
+    String fullPath;// full path of file or dir
+    FileSystem fs;
+
+    public PathDeletionContext(FileSystem fs, String fullPath) {
+      this.fs = fs;
+      this.fullPath = fullPath;
+    }
+    
+    protected String getPathForCleanup() {
+      return fullPath;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable
+     */
+    protected void enablePathForCleanup() throws IOException {
+      // Do nothing by default.
+      // Subclasses can override to provide enabling for deletion.
+    }
   }
 
-  private static class PathCleanupThread extends Thread {
+  /**
+   * Adds the paths to the queue of paths to be deleted by cleanupThread.
+   */
+  void addToQueue(PathDeletionContext... contexts) {
+    cleanupThread.addToQueue(contexts);
+  }
 
-    static class PathAndFS {
-      FileSystem fs;
-      Path path;
-      PathAndFS(FileSystem fs, Path path) {
-        this.fs = fs;
-        this.path = path;
-      }
+  protected static boolean deletePath(PathDeletionContext context)
+            throws IOException {
+    context.enablePathForCleanup();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to delete " + context.fullPath);
+    }
+    if (context.fs.exists(new Path(context.fullPath))) {
+      return context.fs.delete(new Path(context.fullPath), true);
     }
+    return true;
+  }
+
+  // currently used by tests only
+  protected boolean isQueueEmpty() {
+    return (cleanupThread.queue.size() == 0);
+  }
+
+  private static class PathCleanupThread extends Thread {
+
     // cleanup queue which deletes files/directories of the paths queued up.
-    private LinkedBlockingQueue<PathAndFS> queue = new LinkedBlockingQueue<PathAndFS>();
+    private LinkedBlockingQueue<PathDeletionContext> queue =
+      new LinkedBlockingQueue<PathDeletionContext>();
 
     public PathCleanupThread() {
       setName("Directory/File cleanup thread");
@@ -72,27 +112,34 @@
       start();
     }
 
-    public void addToQueue(FileSystem fs, Path... paths) {
-      for (Path p : paths) {
+    void addToQueue(PathDeletionContext[] contexts) {
+      for (PathDeletionContext context : contexts) {
         try {
-          queue.put(new PathAndFS(fs, p));
-        } catch (InterruptedException ie) {}
+          queue.put(context);
+        } catch(InterruptedException ie) {}
       }
     }
 
     public void run() {
-      LOG.debug(getName() + " started.");
-      PathAndFS pathAndFS = null;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + " started.");
+      }
+      PathDeletionContext context = null;
       while (true) {
         try {
-          pathAndFS = queue.take();
+          context = queue.take();
           // delete the path.
-          pathAndFS.fs.delete(pathAndFS.path, true);
-          LOG.debug("DELETED " + pathAndFS.path);
+          if (!deletePath(context)) {
+            LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
+          }
+          else if (LOG.isDebugEnabled()) {
+            LOG.debug("DELETED " + context.fullPath);
+          }
         } catch (InterruptedException t) {
+          LOG.warn("Interrupted deletion of " + context.fullPath);
           return;
         } catch (Exception e) {
-          LOG.warn("Error deleting path" + pathAndFS.path);
+          LOG.warn("Error deleting path " + context.fullPath + ": " + e);
         } 
       }
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Mon Dec 28 13:08:04 2009
@@ -21,6 +21,8 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
@@ -177,4 +179,21 @@
           + exitCode + ".");
     }
   }
+
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+         throws IOException {
+    try {
+      FileUtil.chmod(context.fullPath, "ug+rwx", true);
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while setting permissions for " + context.fullPath +
+          " for deletion.");
+    } catch(IOException ioe) {
+      LOG.warn("Unable to change permissions of " + context.fullPath);
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Dec 28 13:08:04 2009
@@ -44,6 +44,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
@@ -3226,7 +3227,8 @@
       }
 
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); 
+      new CleanupQueue().addToQueue(new PathDeletionContext(
+          jobtracker.getFileSystem(tempDir), tempDir.toUri().getPath())); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java Mon Dec 28 13:08:04 2009
@@ -30,7 +30,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@@ -153,6 +152,22 @@
     }
   }  
 
+  /**
+   * Adds the task's work dir to the cleanup queue of taskTracker for
+   * asynchronous deletion of work dir.
+   * @param tracker taskTracker
+   * @param task    the task whose work dir needs to be deleted
+   * @throws IOException
+   */
+  static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
+    tracker.getCleanupThread().addToQueue(
+        TaskTracker.buildTaskControllerPathDeletionContexts(
+          tracker.getLocalFileSystem(),
+          tracker.getLocalFiles(tracker.getJobConf(), ""),
+          task, true /* workDir */,
+          tracker.getTaskController()));
+  }
+
   private static class JvmManagerForType {
     //Mapping from the JVM IDs to running Tasks
     Map <JVMId,TaskRunner> jvmToRunningTask = 
@@ -447,7 +462,7 @@
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
             if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              FileUtil.fullyDelete(env.workDir);
+              deleteWorkDir(tracker, initalContext.task);
             }
           } catch (IOException ie){}
         }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Mon Dec 28 13:08:04 2009
@@ -29,7 +29,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -90,6 +92,7 @@
     KILL_TASK_JVM,
     RUN_DEBUG_SCRIPT,
     SIGQUIT_TASK_JVM,
+    ENABLE_TASK_FOR_CLEANUP
   }
 
   /**
@@ -229,12 +232,75 @@
   @Override
   void initializeTask(TaskControllerContext context)
       throws IOException {
-    LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
-        + " for " + context.task.getTaskID().toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+                + " for " + context.task.getTaskID().toString());
+    }
     runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
         buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
   }
 
+  /**
+   * Builds the args to be passed to task-controller for enabling of task for
+   * cleanup. Last arg in this List is either $attemptId or $attemptId/work
+   */
+  private List<String> buildTaskCleanupArgs(
+      TaskControllerPathDeletionContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    commandArgs.add(context.mapredLocalDir.toUri().getPath());
+    commandArgs.add(context.task.getJobID().toString());
+
+    String workDir = "";
+    if (context.isWorkDir) {
+      workDir = "/work";
+    }
+    if (context.task.isTaskCleanupTask()) {
+      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+                      + workDir);
+    } else {
+      commandArgs.add(context.task.getTaskID() + workDir);
+    }
+
+    return commandArgs;
+  }
+
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
+                + " for " + context.fullPath);
+    }
+
+    if (context instanceof TaskControllerPathDeletionContext) {
+      TaskControllerPathDeletionContext tContext =
+        (TaskControllerPathDeletionContext) context;
+    
+      if (tContext.task.getUser() != null &&
+          tContext.fs instanceof LocalFileSystem) {
+        try {
+          runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
+                   tContext.task.getUser(),
+                   buildTaskCleanupArgs(tContext), null, null);
+        } catch(IOException e) {
+          LOG.warn("Uanble to change permissions for " + tContext.fullPath);
+        }
+      }
+      else {
+        throw new IllegalArgumentException("Either user is null or the "  +
+                               "file system is not local file system.");
+      }
+    }
+    else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+          + "TaskControllerPathDeletionContext.");
+    }
+  }
+
   private void logOutput(String output) {
     String shExecOutput = output;
     if (shExecOutput != null) {
@@ -458,7 +524,8 @@
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
         command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
+        buildKillTaskCommandArgs(context), context.env.workDir,
+        context.env.env);
     try {
       shExec.execute();
     } catch (Exception e) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Mon Dec 28 13:08:04 2009
@@ -25,6 +25,9 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -191,6 +194,67 @@
   }
 
   /**
+   * Contains info related to the path of the file/dir to be deleted. This info
+   * is needed by task-controller to build the full path of the file/dir
+   */
+  static class TaskControllerPathDeletionContext extends PathDeletionContext {
+    Task task;
+    boolean isWorkDir;
+    TaskController taskController;
+
+    /**
+     * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
+     * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
+     * is built using mapredLocalDir, jobId, taskId, etc.
+     */
+    Path mapredLocalDir;
+
+    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
+        Task task, boolean isWorkDir, TaskController taskController) {
+      super(fs, null);
+      this.task = task;
+      this.isWorkDir = isWorkDir;
+      this.taskController = taskController;
+      this.mapredLocalDir = mapredLocalDir;
+    }
+
+    @Override
+    protected String getPathForCleanup() {
+      if (fullPath == null) {
+        fullPath = buildPathForDeletion();
+      }
+      return fullPath;
+    }
+
+    /**
+     * Builds the path of taskAttemptDir OR taskWorkDir based on
+     * mapredLocalDir, jobId, taskId, etc
+     */
+    String buildPathForDeletion() {
+      String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask())
+        : TaskTracker.getLocalTaskDir(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask());
+
+      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable by
+     * setting proper permissions(770) by task-controller
+     */
+    @Override
+    protected void enablePathForCleanup() throws IOException {
+      getPathForCleanup();// allow init of fullPath, if not inited already
+      if (fs.exists(new Path(fullPath))) {
+        taskController.enableTaskForCleanup(this);
+      }
+    }
+  }
+
+  /**
    * NOTE: This class is internal only class and not intended for users!!
    * 
    */
@@ -251,4 +315,11 @@
   abstract void runDebugScript(DebugScriptContext context) 
       throws IOException;
   
+  /**
+   * Enable the task for cleanup by changing permissions of the path
+   * @param context   path deletion context
+   * @throws IOException
+   */
+  abstract void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException;
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Dec 28 13:08:04 2009
@@ -654,6 +654,39 @@
   }
   
   /**
+   * Sets permissions recursively and then deletes the contents of dir.
+   * Makes dir empty directory(does not delete dir itself).
+   */
+  static void deleteDirContents(JobConf conf, File dir) throws IOException {
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(new Path(dir.getAbsolutePath()))) {
+      File contents[] = dir.listFiles();
+      if (contents != null) {
+        for (int i = 0; i < contents.length; i++) {
+          try {
+            int ret = 0;
+            if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
+                                      "ug+rwx", true)) != 0) {
+              LOG.warn("Unable to chmod for " + contents[i] + 
+                  "; chmod exit status = " + ret);
+            }
+          } catch(InterruptedException e) {
+            LOG.warn("Interrupted while setting permissions for contents of " +
+                "workDir. Not deleting the remaining contents of workDir.");
+            return;
+          }
+          if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
+            LOG.warn("Unable to delete "+ contents[i]);
+          }
+        }
+      }
+    }
+    else {
+      LOG.warn(dir + " does not exist.");
+    }
+  }
+  
+  /**
    * Creates distributed cache symlinks and tmp directory, as appropriate.
    * Note that when we setup the distributed
    * cache, we didn't create the symlinks. This is done on a per task basis
@@ -663,11 +696,14 @@
    * @param workDir Working directory, which is completely deleted.
    */
   public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
-    LOG.debug("Fully deleting and re-creating" + workDir);
-    FileUtil.fullyDelete(workDir);
-    if (!workDir.mkdir()) {
-      LOG.debug("Did not recreate " + workDir);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Fully deleting contents of " + workDir);
     }
+
+    /** delete only the contents of workDir leaving the directory empty. We
+     * can't delete the workDir as it is the current working directory.
+     */
+    deleteDirContents(conf, workDir);
     
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Dec 28 13:08:04 2009
@@ -38,7 +38,6 @@
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
-import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
@@ -68,6 +67,8 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -103,7 +104,6 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 
 /*******************************************************
@@ -232,7 +232,7 @@
   static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR;
 
   private JobConf fConf;
-  FileSystem localFs;
+  private FileSystem localFs;
 
   private Localizer localizer;
 
@@ -398,6 +398,11 @@
     return taskController;
   }
   
+  // Currently this is used only by tests
+  void setTaskController(TaskController t) {
+    taskController = t;
+  }
+  
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
@@ -521,10 +526,7 @@
 
   static String getTaskWorkDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
-    if (isCleanupAttempt) {
-      dir = dir + TASK_CLEANUP_SUFFIX;
-    }
+    String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
     return dir + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
@@ -1189,7 +1191,16 @@
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
   }
+
+  // only used by tests
+  void setCleanupThread(CleanupQueue c) {
+    directoryCleanupThread = c;
+  }
   
+  CleanupQueue getCleanupThread() {
+    return directoryCleanupThread;
+  }
+
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
@@ -1557,6 +1568,44 @@
   }
 
   /**
+   * Builds list of PathDeletionContext objects for the given paths
+   */
+  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
+      Path[] paths) {
+    int i = 0;
+    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
+    }
+    return contexts;
+  }
+
+  /**
+   * Builds list of TaskControllerPathDeletionContext objects for a task
+   * @param fs    : FileSystem in which the dirs to be deleted
+   * @param paths : mapred-local-dirs
+   * @param task  : the task whose taskDir or taskWorkDir is going to be deleted
+   * @param isWorkDir : the dir to be deleted is workDir or taskDir
+   * @param taskController : the task-controller to be used for deletion of
+   *                         taskDir or taskWorkDir
+   */
+  static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
+      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
+      TaskController taskController)
+      throws IOException {
+    int i = 0;
+    PathDeletionContext[] contexts =
+                          new TaskControllerPathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
+                          isWorkDir, taskController);
+    }
+    return contexts;
+  }
+
+  /**
    * Send a signal to a stuck task commanding it to dump stack traces
    * to stderr before we kill it with purgeTask().
    *
@@ -1621,8 +1670,9 @@
    */
   void removeJobFiles(String user, String jobId)
       throws IOException {
-    directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
-        getLocalJobDir(user, jobId)));
+    PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
+        getLocalFiles(fConf, getLocalJobDir(user, jobId)));
+    directoryCleanupThread.addToQueue(contexts);
   }
 
   /**
@@ -2691,29 +2741,33 @@
           runner.close();
         }
 
-        String localTaskDir =
-            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
-                .toString(), task.isTaskCleanupTask());
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
           // No jvm reuse, remove everything
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, localTaskDir));
+          PathDeletionContext[] contexts =
+            buildTaskControllerPathDeletionContexts(localFs,
+                getLocalFiles(fConf, ""), task, false/* not workDir */,
+                taskController);
+          directoryCleanupThread.addToQueue(contexts);
         } else {
           // Jvm reuse. We don't delete the workdir since some other task
           // (running in the same JVM) might be using the dir. The JVM
           // running the tasks would clean the workdir per a task in the
           // task process itself.
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, localTaskDir + Path.SEPARATOR
-                  + TaskTracker.JOBFILE));
+          String localTaskDir =
+            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
+                .toString(), task.isTaskCleanupTask());
+          PathDeletionContext[] contexts = buildPathDeletionContexts(
+              localFs, getLocalFiles(defaultJobConf, localTaskDir +
+                         Path.SEPARATOR + TaskTracker.JOBFILE));
+          directoryCleanupThread.addToQueue(contexts);
         }
       } else {
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          String taskWorkDir =
-              getTaskWorkDir(task.getUser(), task.getJobID().toString(),
-                  taskId.toString(), task.isTaskCleanupTask());
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, taskWorkDir));
+          PathDeletionContext[] contexts =
+            buildTaskControllerPathDeletionContexts(localFs,
+              getLocalFiles(fConf, ""), task, true /* workDir */,
+              taskController);
+          directoryCleanupThread.addToQueue(contexts);
         }
       }
     }
@@ -3351,19 +3405,29 @@
     }
   }
   
-
   // get the full paths of the directory in all the local disks.
-  private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+  Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
     String[] localDirs = conf.getLocalDirs();
     Path[] paths = new Path[localDirs.length];
     FileSystem localFs = FileSystem.getLocal(conf);
+    boolean subdirNeeded = (subdir != null) && (subdir.length() > 0);
     for (int i = 0; i < localDirs.length; i++) {
-      paths[i] = new Path(localDirs[i], subdir);
+      paths[i] = (subdirNeeded) ? new Path(localDirs[i], subdir)
+                                : new Path(localDirs[i]);
       paths[i] = paths[i].makeQualified(localFs);
     }
     return paths;
   }
 
+  FileSystem getLocalFileSystem(){
+    return localFs;
+  }
+
+  // only used by tests
+  void setLocalFileSystem(FileSystem fs){
+    localFs = fs;
+  }
+
   int getMaxCurrentMapTasks() {
     return maxMapSlots;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Mon Dec 28 13:08:04 2009
@@ -193,7 +193,7 @@
     synchronized (localizedUser) {
 
       if (localizedUser.get()) {
-        // User-directories are already localized for his user.
+        // User-directories are already localized for this user.
         LOG.info("User-directories for the user " + user
             + " are already initialized on this TT. Not doing anything.");
         return;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Mon Dec 28 13:08:04 2009
@@ -268,7 +268,18 @@
   public int getNumTaskTrackers() {
     return taskTrackerList.size();
   }
-    
+  
+  /**
+   * Sets inline cleanup threads to all task trackers sothat deletion of
+   * temporary files/dirs happen inline
+   */
+  public void setInlineCleanupThreads() {
+    for (int i = 0; i < getNumTaskTrackers(); i++) {
+      getTaskTrackerRunner(i).getTaskTracker().setCleanupThread(
+          new UtilsForTests.InlineCleanupQueue());
+    }
+  }
+
   /**
    * Wait until the system is idle.
    */

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java Mon Dec 28 13:08:04 2009
@@ -58,6 +58,9 @@
       namenode = fileSys.getUri().toString();
       mr = new MiniMRCluster(10, namenode, 3, 
           null, null, mrConf);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
       JobConf jobConf = mr.createJobConf();
       runSleepJob(jobConf);
@@ -78,19 +81,8 @@
       "/taskTracker/jobcache";
       File jobDir = new File(jobDirStr);
       String[] contents = jobDir.list();
-      if (contents == null || contents.length == 0) {
-        return;
-      }
-      while (contents.length > 0) {
-        try {
-          Thread.sleep(1000);
-          LOG.warn(jobDir +" not empty yet, contents are");
-          for (String s: contents) {
-            LOG.info(s);
-          }
-          contents = jobDir.list();
-        } catch (InterruptedException ie){}
-      }
+      assertTrue("Contents of " + jobDir + " not cleanup.",
+                 (contents == null || contents.length == 0));
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Mon Dec 28 13:08:04 2009
@@ -45,10 +45,15 @@
   private static String taskTrackerSpecialGroup;
 
   @Override
+  protected boolean canRun() {
+    return ClusterWithLinuxTaskController.shouldRun();
+  }
+
+  @Override
   protected void setUp()
       throws Exception {
 
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
+    if (!canRun()) {
       return;
     }
 
@@ -66,7 +71,8 @@
     taskController.setConf(trackerFConf);
     taskController.setup();
 
-    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
         taskController));
 
     // Rewrite conf so as to reflect task's correct user name.
@@ -81,7 +87,7 @@
   @Override
   protected void tearDown()
       throws Exception {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
+    if (!canRun()) {
       return;
     }
     super.tearDown();
@@ -96,21 +102,6 @@
     // Do nothing.
   }
 
-  /**
-   * Test the localization of a user on the TT when {@link LinuxTaskController}
-   * is in use.
-   */
-  @Override
-  public void testUserLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testJobLocalization();
-  }
-
   @Override
   protected void checkUserLocalization()
       throws IOException {
@@ -148,21 +139,6 @@
     }
   }
 
-  /**
-   * Test job localization with {@link LinuxTaskController}. Also check the
-   * permissions and file ownership of the job related files.
-   */
-  @Override
-  public void testJobLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testJobLocalization();
-  }
-
   @Override
   protected void checkJobLocalization()
       throws IOException {
@@ -208,21 +184,6 @@
     }
   }
 
-  /**
-   * Test task localization with {@link LinuxTaskController}. Also check the
-   * permissions and file ownership of task related files.
-   */
-  @Override
-  public void testTaskLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testTaskLocalization();
-  }
-
   @Override
   protected void checkTaskLocalization()
       throws IOException {
@@ -248,16 +209,4 @@
           .getUser(), taskTrackerSpecialGroup);
     }
   }
-
-  /**
-   * Test cleanup of task files with {@link LinuxTaskController}.
-   */
-  @Override
-  public void testTaskCleanup()
-      throws IOException {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testTaskCleanup();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Dec 28 13:08:04 2009
@@ -61,6 +61,10 @@
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(2, "file:///", 3);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
+
       TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
 
       // run the wordcount example with caching

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Dec 28 13:08:04 2009
@@ -329,6 +329,9 @@
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
 
       runPI(mr, mr.createJobConf());
       runWordCount(mr, mr.createJobConf());

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=894165&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java Mon Dec 28 13:08:04 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+public class TestSetupWorkDir extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestSetupWorkDir.class);
+
+  /**
+   * Create a file in the given dir and set permissions r_xr_xr_x sothat no one
+   * can delete it directly(without doing chmod).
+   * Creates dir/subDir and dir/subDir/file
+   */
+  static void createFileAndSetPermissions(JobConf jobConf, Path dir)
+       throws IOException {
+    Path subDir = new Path(dir, "subDir");
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    fs.mkdirs(subDir);
+    Path p = new Path(subDir, "file");
+    DataOutputStream out = fs.create(p);
+    out.writeBytes("dummy input");
+    out.close();
+    // no write permission for subDir and subDir/file
+    try {
+      int ret = 0;
+      if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
+        LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
+      }
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while doing chmod for " + subDir);
+    }
+  }
+
+  /**
+   * Validates if setupWorkDir is properly cleaning up contents of workDir.
+   */
+  public void testSetupWorkDir() throws IOException {
+    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
+                            "testSetupWorkDir");
+    Path myWorkDir = new Path(rootDir, "./work");
+    JobConf jConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jConf);
+    if (fs.exists(myWorkDir)) {
+      fs.delete(myWorkDir, true);
+    }
+    if (!fs.mkdirs(myWorkDir)) {
+      throw new IOException("Unable to create workDir " + myWorkDir);
+    }
+
+    // create {myWorkDir}/subDir/file and set 555 perms for subDir and file
+    createFileAndSetPermissions(jConf, myWorkDir);
+
+    TaskRunner.deleteDirContents(jConf, new File(myWorkDir.toUri().getPath()));
+    
+    assertTrue("Contents of " + myWorkDir + " are not cleaned up properly.",
+        fs.listStatus(myWorkDir).length == 0);
+    
+    // cleanup
+    fs.delete(rootDir, true);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Mon Dec 28 13:08:04 2009
@@ -107,6 +107,10 @@
         mrCluster =   new MiniMRCluster(0, 0,
             numTT, dfs.getFileSystem().getUri().toString(), 
             1, null, null, MR_UGI, new JobConf());
+        // make cleanup inline sothat validation of existence of these directories
+        // can be done
+        mrCluster.setInlineCleanupThreads();
+
         mrCluster.getJobTrackerRunner().getJobTracker()
         .addJobInProgressListener(myListener);
       }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Mon Dec 28 13:08:04 2009
@@ -48,6 +48,7 @@
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 
 import junit.framework.TestCase;
 
@@ -86,36 +87,20 @@
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
 
-  class InlineCleanupQueue extends CleanupQueue {
-    List<Path> stalePaths = new ArrayList<Path>();
-
-    public InlineCleanupQueue() {
-      // do nothing
-    }
-
-    @Override
-    public void addToQueue(FileSystem fs, Path... paths) {
-      // delete in-line
-      for (Path p : paths) {
-        try {
-          LOG.info("Trying to delete the path " + p);
-          if (!fs.delete(p, true)) {
-            LOG.warn("Stale path " + p.toUri().getPath());
-            stalePaths.add(p);
-          }
-        } catch (IOException e) {
-          LOG.warn("Caught exception while deleting path "
-              + p.toUri().getPath());
-          LOG.info(StringUtils.stringifyException(e));
-          stalePaths.add(p);
-        }
-      }
-    }
+  /**
+   * Dummy method in this base class. Only derived classes will define this
+   * method for checking if a test can be run.
+   */
+  protected boolean canRun() {
+    return true;
   }
 
   @Override
   protected void setUp()
       throws Exception {
+    if (!canRun()) {
+      return;
+    }
     TEST_ROOT_DIR =
         new File(System.getProperty("test.build.data", "/tmp"), getClass()
             .getSimpleName());
@@ -155,7 +140,8 @@
     tracker.setConf(trackerFConf);
 
     // for test case system FS is the local FS
-    tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
+    tracker.systemFS = FileSystem.getLocal(trackerFConf);
+    tracker.setLocalFileSystem(tracker.systemFS);
     tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
     
     taskTrackerUGI = UserGroupInformation.login(trackerFConf);
@@ -177,7 +163,8 @@
     taskController.setConf(trackerFConf);
     taskController.setup();
 
-    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
         taskController));
   }
 
@@ -269,6 +256,9 @@
   @Override
   protected void tearDown()
       throws Exception {
+    if (!canRun()) {
+      return;
+    }
     FileUtil.fullyDelete(TEST_ROOT_DIR);
   }
 
@@ -313,6 +303,9 @@
    */
   public void testTaskControllerSetup()
       throws IOException {
+    if (!canRun()) {
+      return;
+    }
     // Task-controller is already set up in the test's setup method. Now verify.
     for (String localDir : localDirs) {
 
@@ -336,7 +329,9 @@
    */
   public void testUserLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     // /////////// The main method being tested
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     // ///////////
@@ -407,7 +402,9 @@
    */
   public void testJobLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
 
     // /////////// The main method being tested
@@ -501,7 +498,9 @@
    */
   public void testTaskLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     localizedJobConf = tracker.localizeJobFiles(task);
 
@@ -617,14 +616,102 @@
   }
 
   /**
+   * Validates the removal of $taskid and $tasid/work under mapred-local-dir
+   * in cases where those directories cannot be deleted without adding
+   * write permission to the newly created directories under $taskid and
+   * $taskid/work
+   * Also see TestSetupWorkDir.createFileAndSetPermissions for details
+   */
+  void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
+                           TaskInProgress tip) throws IOException {
+    // create files and set permissions 555. Verify if task controller sets
+    // the permissions for TT to delete the taskDir or workDir
+    String dir = (!needCleanup || jvmReuse) ?
+        TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(),
+          taskId.toString(), task.isTaskCleanupTask())
+      : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
+          taskId.toString(), task.isTaskCleanupTask());
+
+    Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
+    for (Path p : paths) {
+      if (tracker.getLocalFileSystem().exists(p)) {
+        TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p);
+      }
+    }
+
+    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+    tracker.setCleanupThread(cleanupQueue);
+
+    tip.removeTaskFiles(needCleanup, taskId);
+
+    if (jvmReuse) {
+      // work dir should still exist and cleanup queue should be empty
+      assertTrue("cleanup queue is not empty after removeTaskFiles() in case "
+          + "of jvm reuse.", cleanupQueue.isQueueEmpty());
+      boolean workDirExists = false;
+      for (Path p : paths) {
+        if (tracker.getLocalFileSystem().exists(p)) {
+          workDirExists = true;
+        }
+      }
+      assertTrue("work dir does not exist in case of jvm reuse", workDirExists);
+
+      // now try to delete the work dir and verify that there are no stale paths
+      JvmManager.deleteWorkDir(tracker, task);
+    }
+    tracker.removeJobFiles(task.getUser(), jobId.toString());
+
+    assertTrue("Some task files are not deleted!! Number of stale paths is "
+        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+  }
+
+  /**
+   * Validates if task cleanup is done properly for a succeeded task
    * @throws IOException
    */
   public void testTaskCleanup()
       throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(false, false);// no needCleanup; no jvmReuse
+  }
+
+  /**
+   * Validates if task cleanup is done properly for a task that is not succeeded
+   * @throws IOException
+   */
+  public void testFailedTaskCleanup()
+  throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(true, false);// needCleanup; no jvmReuse
+  }
 
+  /**
+   * Validates if task cleanup is done properly for a succeeded task
+   * @throws IOException
+   */
+  public void testTaskCleanupWithJvmUse()
+      throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(false, true);// no needCleanup; jvmReuse
+  }
+
+  /**
+   * Validates if task cleanup is done properly
+   */
+  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+      throws IOException {
     // Localize job and localize task.
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     localizedJobConf = tracker.localizeJobFiles(task);
+    if (jvmReuse) {
+      localizedJobConf.setNumTasksToExecutePerJvm(2);
+    }
     // Now initialize the job via task-controller so as to set
     // ownership/permissions of jars, job-work-dir
     JobInitializationContext jobContext = new JobInitializationContext();
@@ -663,18 +750,9 @@
 
     // TODO: Let the task run and create files.
 
-    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
-    tracker.directoryCleanupThread = cleanupQueue;
-
-    // ////////// The central methods being tested
-    tip.removeTaskFiles(true, taskId);
-    tracker.removeJobFiles(task.getUser(), jobId.toString());
-    // //////////
-
-    // TODO: make sure that all files intended to be deleted are deleted.
-
-    assertTrue("Some task files are not deleted!! Number of stale paths is "
-        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+    // create files and set permissions 555. Verify if task controller sets
+    // the permissions for TT to delete the task dir or work dir properly
+    validateRemoveFiles(needCleanup, jvmReuse, tip);
 
     // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still
     // there.
@@ -682,7 +760,7 @@
       Path userDir =
           new Path(localDir, TaskTracker.getUserDir(task.getUser()));
       assertTrue("User directory " + userDir + " is not present!!",
-          tracker.localFs.exists(userDir));
+          tracker.getLocalFileSystem().exists(userDir));
     }
 
     // Test userlogs cleanup.
@@ -702,7 +780,7 @@
 
     // Logs should be there before cleanup.
     assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
-        tracker.localFs.exists(logDir));
+        tracker.getLocalFileSystem().exists(logDir));
 
     // ////////// Another being tested
     TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
@@ -711,6 +789,6 @@
 
     // Logs should be gone after cleanup.
     assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
-        tracker.localFs.exists(logDir));
+        tracker.getLocalFileSystem().exists(logDir));
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Mon Dec 28 13:08:04 2009
@@ -25,9 +25,11 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.text.DecimalFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +50,7 @@
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.StringUtils;
 
 import org.apache.commons.logging.Log;
 
@@ -650,6 +653,37 @@
 
     return job;
   }
+  
+  /**
+   * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
+   * asynchronously.
+   */
+  public static class InlineCleanupQueue extends CleanupQueue {
+    List<String> stalePaths = new ArrayList<String>();
+
+    public InlineCleanupQueue() {
+      // do nothing
+    }
+
+    @Override
+    public void addToQueue(PathDeletionContext... contexts) {
+      // delete paths in-line
+      for (PathDeletionContext context : contexts) {
+        try {
+          if (!deletePath(context)) {
+            LOG.warn("Stale path " + context.fullPath);
+            stalePaths.add(context.fullPath);
+          }
+        } catch (IOException e) {
+          LOG.warn("Caught exception while deleting path "
+              + context.fullPath);
+          LOG.info(StringUtils.stringifyException(e));
+          stalePaths.add(context.fullPath);
+        }
+      }
+    }
+  }
+  
   static class FakeClock extends Clock {
     long time = 0;
     

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java?rev=894165&r1=894164&r2=894165&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java Mon Dec 28 13:08:04 2009
@@ -58,7 +58,10 @@
       JobConf mrConf = new JobConf(conf);
       mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1, 
                              null, null, mrConf);
-
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
+      
       // Run examples
       TestMiniMRWithDFS.runPI(mr, mr.createJobConf(mrConf));
       TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf(mrConf));