You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:43:44 UTC

svn commit: r1077127 - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/server/tasktracker/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/secur...

Author: omalley
Date: Fri Mar  4 03:43:43 2011
New Revision: 1077127

URL: http://svn.apache.org/viewvc?rev=1077127&view=rev
Log:
commit b2c61f3f33b3c2ff8b42efd7473752b9b4bc125c
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Tue Jan 26 15:10:41 2010 +0530

    MAPREDUCE:896 from https://issues.apache.org/jira/secure/attachment/12431413/MR-896.v8-y20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-896. Enhance tasktracker to cleanup files that might have
    +    been created by user tasks with non-writable permissions.
    +    (Ravi Gummadi via yhemanth)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/main.c?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/main.c Fri Mar  4 03:43:43 2011
@@ -58,6 +58,7 @@ int main(int argc, char **argv) {
       NULL, 0 } };
 
   const char* log_file = NULL;
+  char * dir_to_be_deleted = NULL;
 
   //Minimum number of arguments required to run the task-controller
   //command-name user command tt-root
@@ -135,6 +136,13 @@ int main(int argc, char **argv) {
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
     break;
+  case ENABLE_TASK_FOR_CLEANUP:
+    tt_root = argv[optind++];
+    job_id = argv[optind++];
+    dir_to_be_deleted = argv[optind++];
+    exit_code = enable_task_for_cleanup(tt_root, user_detail->pw_name, job_id,
+                                        dir_to_be_deleted);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar  4 03:43:43 2011
@@ -197,6 +197,17 @@ char *get_task_launcher_file(const char 
       attempt_dir);
 }
 
+/*
+ * Builds the full path of the dir(localTaskDir or localWorkDir)
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir($taskId) OR taskWorkDir($taskId/work)
+ */
+char *get_task_dir_path(const char *tt_root, const char *user,
+                        const char *jobid, const char *dir_to_be_deleted) {
+  return concatenate(TT_LOCAL_TASK_DIR_PATTERN, "task_dir_full_path", 4,
+                     tt_root, user, jobid, dir_to_be_deleted);
+}
+
 /**
  * Get the log directory for the given attempt.
  */
@@ -218,17 +229,17 @@ int check_tt_root(const char *tt_root) {
  * launcher file resolve to one and same. This is done so as to avoid
  * security pitfalls because of relative path components in the file name.
  */
-int check_task_launcher_path(char *path) {
+int check_path_for_relative_components(char *path) {
   char * resolved_path = (char *) canonicalize_file_name(path);
   if (resolved_path == NULL) {
     fprintf(LOGFILE,
-        "Error resolving the task launcher file path: %s. Passed path: %s\n",
+        "Error resolving the path: %s. Passed path: %s\n",
         strerror(errno), path);
     return ERROR_RESOLVING_FILE_PATH;
   }
   if (strcmp(resolved_path, path) != 0) {
     fprintf(LOGFILE,
-        "Relative path components in the file path: %s. Resolved path: %s\n",
+        "Relative path components in the path: %s. Resolved path: %s\n",
         path, resolved_path);
     free(resolved_path);
     return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
@@ -255,20 +266,23 @@ static int change_owner(const char *path
 static int change_mode(const char *path, mode_t mode) {
   int exit_code = chmod(path, mode);
   if (exit_code != 0) {
-    fprintf(LOGFILE, "chown %d of path %s failed: %s.\n", mode, path,
+    fprintf(LOGFILE, "chmod %d of path %s failed: %s.\n", mode, path,
         strerror(errno));
   }
   return exit_code;
 }
 
 /**
- * Function to secure the given path. It does the following recursively:
+ * Function to change permissions of the given path. It does the following
+ * recursively:
  *    1) changes the owner/group of the paths to the passed owner/group
  *    2) changes the file permission to the passed file_mode and directory
  *       permission to the passed dir_mode
+ *
+ * should_check_ownership : boolean to enable checking of ownership of each path
  */
 static int secure_path(const char *path, uid_t uid, gid_t gid,
-    mode_t file_mode, mode_t dir_mode) {
+    mode_t file_mode, mode_t dir_mode, int should_check_ownership) {
   FTS *tree = NULL; // the file hierarchy
   FTSENT *entry = NULL; // a file in the hierarchy
   char *paths[] = { (char *) path, NULL };//array needs to be NULL-terminated
@@ -361,7 +375,8 @@ static int secure_path(const char *path,
     if (!process_path) {
       continue;
     }
-    if (compare_ownership(uid, gid, entry->fts_path) == 0) {
+    if (should_check_ownership &&
+          (compare_ownership(uid, gid, entry->fts_path) == 0)) {
       // already set proper permissions.
       // This might happen with distributed cache.
 #ifdef DEBUG
@@ -373,7 +388,7 @@ static int secure_path(const char *path,
       continue;
     }
 
-    if (check_ownership(entry->fts_path) != 0) {
+    if (should_check_ownership && (check_ownership(entry->fts_path) != 0)) {
       fprintf(LOGFILE,
           "Invalid file path. %s not user/group owned by the tasktracker.\n",
           entry->fts_path);
@@ -466,8 +481,9 @@ int prepare_attempt_directories(const ch
         free(job_dir);
         break;
       }
-    } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid,
-        S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+    } else if (secure_path(attempt_dir, user_detail->pw_uid,
+               tasktracker_gid, S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG,
+               1) != 0) {
       // No setgid on files and setgid on dirs, 770
       fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir);
       failed = 1;
@@ -526,8 +542,8 @@ int prepare_task_logs(const char *log_di
   }
 
   gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
-  if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU
-      | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
+  if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid,
+      S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
     // setgid on dirs but not files, 770. As of now, there are no files though
     fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
     return -1;
@@ -640,9 +656,9 @@ int initialize_user(const char *user) {
         free(user_dir);
         break;
       }
-    } else if (secure_path(user_dir, user_detail->pw_uid, tasktracker_gid,
-        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
-        != 0) {
+    } else if (secure_path(user_dir, user_detail->pw_uid,
+        tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR |
+                         S_IXUSR | S_IRWXG, 1) != 0) {
       // No setgid on files and setgid on dirs, 570
       fprintf(LOGFILE, "Failed to secure the user_dir %s\n",
           user_dir);
@@ -722,7 +738,7 @@ int initialize_job(const char *jobid, co
         break;
       }
     } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid,
-        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG, 1)
         != 0) {
       // No setgid on files and setgid on dirs, 570
       fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir);
@@ -848,7 +864,7 @@ int initialize_distributed_cache(const c
       }
     } else if (secure_path(distcache_dir, user_detail->pw_uid,
         tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
-            | S_IXUSR | S_IRWXG) != 0) {
+            | S_IXUSR | S_IRWXG, 1) != 0) {
       // No setgid on files and setgid on dirs, 570
       fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
           distcache_dir);
@@ -963,7 +979,7 @@ int run_task_as_user(const char * user, 
   }
 
   errno = 0;
-  exit_code = check_task_launcher_path(task_script_path);
+  exit_code = check_path_for_relative_components(task_script_path);
   if(exit_code != 0) {
     goto cleanup;
   }
@@ -1048,3 +1064,60 @@ int kill_user_task(const char *user, con
   cleanup();
   return 0;
 }
+
+/**
+ * Enables the path for deletion by changing the owner, group and permissions
+ * of the specified path and all the files/directories in the path recursively.
+ *     *  sudo chown user:mapred -R full_path
+ *     *  sudo chmod 2770 -R full_path
+ * Before changing permissions, makes sure that the given path doesn't contain
+ * any relative components.
+ * tt_root : is the base path(i.e. mapred-local-dir) sent to task-controller
+ * dir_to_be_deleted : is either taskDir OR taskWorkDir that is to be deleted
+ */
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+           const char *jobid, const char *dir_to_be_deleted) {
+  int exit_code = 0;
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+  char * full_path = NULL;
+  if (check_tt_root(tt_root) < 0) {
+    fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+ 
+  full_path = get_task_dir_path(tt_root, user, jobid, dir_to_be_deleted);
+  if (full_path == NULL) {
+    fprintf(LOGFILE,
+            "Could not build the full path. Not deleting the dir %s\n",
+            dir_to_be_deleted);
+    exit_code = UNABLE_TO_BUILD_PATH; // may be malloc failed
+  }
+     // Make sure that the path given is not having any relative components
+  else if ((exit_code = check_path_for_relative_components(full_path)) != 0) {
+    fprintf(LOGFILE,
+    "Not changing permissions. Path may contain relative components.\n",
+         full_path);
+  }
+  else if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+    exit_code = INVALID_USER_NAME;
+  }
+  else if (exit_code = secure_path(full_path, user_detail->pw_uid,
+               tasktracker_gid,
+               S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 0) != 0) {
+    // No setgid on files and setgid on dirs, 770.
+    // set 770 permissions for user, TTgroup for all files/directories in
+    // 'full_path' recursively sothat deletion of path by TaskTracker succeeds.
+
+    fprintf(LOGFILE, "Failed to set permissions for %s\n", full_path);
+  }
+
+  if (full_path != NULL) {
+    free(full_path);
+  }
+  // free configurations
+  cleanup();
+  return exit_code;
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.h?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.h Fri Mar  4 03:43:43 2011
@@ -44,6 +44,7 @@ enum command {
   INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
   KILL_TASK_JVM,
+  ENABLE_TASK_FOR_CLEANUP
 };
 
 enum errorcodes {
@@ -67,6 +68,7 @@ enum errorcodes {
   OUT_OF_MEMORY, //18
   INITIALIZE_DISTCACHE_FAILED, //19
   INITIALIZE_USER_FAILED, //20
+  UNABLE_TO_BUILD_PATH //21
 };
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -83,6 +85,8 @@ enum errorcodes {
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 
+#define TT_LOCAL_TASK_DIR_PATTERN    "%s/taskTracker/%s/jobcache/%s/%s"
+
 #define TT_SYS_DIR_KEY "mapred.local.dir"
 
 #define TT_LOG_DIR_KEY "hadoop.log.dir"
@@ -109,6 +113,9 @@ int initialize_distributed_cache(const c
 
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
+int enable_task_for_cleanup(const char *tt_root, const char *user,
+                            const char *jobid, const char *dir_to_be_deleted);
+
 int prepare_attempt_directory(const char *attempt_dir, const char *user);
 
 // The following functions are exposed for testing

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Fri Mar  4 03:43:43 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -38,7 +39,7 @@ class CleanupQueue {
    * paths(directories/files) in a separate thread. This constructor creates a
    * clean-up thread and also starts it as a daemon. Callers can instantiate one
    * CleanupQueue per JVM and can use it for deleting paths. Use
-   * {@link CleanupQueue#addToQueue(FileSystem, Path...)} to add paths for
+   * {@link CleanupQueue#addToQueue(PathDeletionContext...)} to add paths for
    * deletion.
    */
   public CleanupQueue() {
@@ -49,22 +50,61 @@ class CleanupQueue {
     }
   }
   
-  public void addToQueue(FileSystem fs, Path...paths) {
-    cleanupThread.addToQueue(fs, paths);
+  /**
+   * Contains info related to the path of the file/dir to be deleted
+   */
+  static class PathDeletionContext {
+    String fullPath;// full path of file or dir
+    FileSystem fs;
+
+    public PathDeletionContext(FileSystem fs, String fullPath) {
+      this.fs = fs;
+      this.fullPath = fullPath;
+    }
+    
+    protected String getPathForCleanup() {
+      return fullPath;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable
+     */
+    protected void enablePathForCleanup() throws IOException {
+      // Do nothing by default.
+      // Subclasses can override to provide enabling for deletion.
+    }
   }
 
-  private static class PathCleanupThread extends Thread {
+  /**
+   * Adds the paths to the queue of paths to be deleted by cleanupThread.
+   */
+  void addToQueue(PathDeletionContext... contexts) {
+    cleanupThread.addToQueue(contexts);
+  }
 
-    static class PathAndFS {
-      FileSystem fs;
-      Path path;
-      PathAndFS(FileSystem fs, Path path) {
-        this.fs = fs;
-        this.path = path;
-      }
+  protected static boolean deletePath(PathDeletionContext context)
+            throws IOException {
+    context.enablePathForCleanup();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to delete " + context.fullPath);
+    }
+    if (context.fs.exists(new Path(context.fullPath))) {
+      return context.fs.delete(new Path(context.fullPath), true);
     }
+    return true;
+  }
+
+  // currently used by tests only
+  protected boolean isQueueEmpty() {
+    return (cleanupThread.queue.size() == 0);
+  }
+
+  private static class PathCleanupThread extends Thread {
+
     // cleanup queue which deletes files/directories of the paths queued up.
-    private LinkedBlockingQueue<PathAndFS> queue = new LinkedBlockingQueue<PathAndFS>();
+    private LinkedBlockingQueue<PathDeletionContext> queue =
+      new LinkedBlockingQueue<PathDeletionContext>();
 
     public PathCleanupThread() {
       setName("Directory/File cleanup thread");
@@ -72,27 +112,34 @@ class CleanupQueue {
       start();
     }
 
-    public void addToQueue(FileSystem fs, Path... paths) {
-      for (Path p : paths) {
+    void addToQueue(PathDeletionContext[] contexts) {
+      for (PathDeletionContext context : contexts) {
         try {
-          queue.put(new PathAndFS(fs, p));
-        } catch (InterruptedException ie) {}
+          queue.put(context);
+        } catch(InterruptedException ie) {}
       }
     }
 
     public void run() {
-      LOG.debug(getName() + " started.");
-      PathAndFS pathAndFS = null;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + " started.");
+      }
+      PathDeletionContext context = null;
       while (true) {
         try {
-          pathAndFS = queue.take();
+          context = queue.take();
           // delete the path.
-          pathAndFS.fs.delete(pathAndFS.path, true);
-          LOG.debug("DELETED " + pathAndFS.path);
+          if (!deletePath(context)) {
+            LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
+          }
+          else if (LOG.isDebugEnabled()) {
+            LOG.debug("DELETED " + context.fullPath);
+          }
         } catch (InterruptedException t) {
+          LOG.warn("Interrupted deletion of " + context.fullPath);
           return;
         } catch (Exception e) {
-          LOG.warn("Error deleting path" + pathAndFS.path);
+          LOG.warn("Error deleting path " + context.fullPath + ": " + e);
         } 
       }
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 03:43:43 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.List;
 
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
@@ -133,6 +135,23 @@ public class DefaultTaskController exten
     }
   }
 
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+         throws IOException {
+    try {
+      FileUtil.chmod(context.fullPath, "ug+rwx", true);
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while setting permissions for " + context.fullPath +
+          " for deletion.");
+    } catch(IOException ioe) {
+      LOG.warn("Unable to change permissions of " + context.fullPath);
+    }
+  }
+
   @Override
   public void initializeDistributedCache(InitializationContext context) {
     // Do nothing.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:43:43 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
@@ -2850,7 +2851,8 @@ class JobInProgress {
       }
 
       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-      new CleanupQueue().addToQueue(jobtracker.getFileSystem(tempDir), tempDir); 
+      new CleanupQueue().addToQueue(new PathDeletionContext(
+          jobtracker.getFileSystem(tempDir), tempDir.toUri().getPath())); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 03:43:43 2011
@@ -30,7 +30,6 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.ProcessTree;
@@ -144,6 +143,22 @@ class JvmManager {
     }
   }  
 
+  /**
+   * Adds the task's work dir to the cleanup queue of taskTracker for
+   * asynchronous deletion of work dir.
+   * @param tracker taskTracker
+   * @param task    the task whose work dir needs to be deleted
+   * @throws IOException
+   */
+  static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
+    tracker.getCleanupThread().addToQueue(
+        TaskTracker.buildTaskControllerPathDeletionContexts(
+          tracker.getLocalFileSystem(),
+          tracker.getLocalFiles(tracker.getJobConf(), ""),
+          task, true /* workDir */,
+          tracker.getTaskController()));
+  }
+
   private static class JvmManagerForType {
     //Mapping from the JVM IDs to running Tasks
     Map <JVMId,TaskRunner> jvmToRunningTask = 
@@ -438,7 +453,7 @@ class JvmManager {
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
             if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              FileUtil.fullyDelete(env.workDir);
+              deleteWorkDir(tracker, initalContext.task);
             }
           } catch (IOException ie){}
         }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 03:43:43 2011
@@ -29,6 +29,9 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -87,6 +90,7 @@ class LinuxTaskController extends TaskCo
     INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
     KILL_TASK_JVM,
+    ENABLE_TASK_FOR_CLEANUP
   }
 
   /**
@@ -208,12 +212,75 @@ class LinuxTaskController extends TaskCo
   @Override
   void initializeTask(TaskControllerContext context)
       throws IOException {
-    LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
-        + " for " + context.task.getTaskID().toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+                + " for " + context.task.getTaskID().toString());
+    }
     runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
         buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
   }
 
+  /**
+   * Builds the args to be passed to task-controller for enabling of task for
+   * cleanup. Last arg in this List is either $attemptId or $attemptId/work
+   */
+  private List<String> buildTaskCleanupArgs(
+      TaskControllerPathDeletionContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    commandArgs.add(context.mapredLocalDir.toUri().getPath());
+    commandArgs.add(context.task.getJobID().toString());
+
+    String workDir = "";
+    if (context.isWorkDir) {
+      workDir = "/work";
+    }
+    if (context.task.isTaskCleanupTask()) {
+      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
+                      + workDir);
+    } else {
+      commandArgs.add(context.task.getTaskID() + workDir);
+    }
+
+    return commandArgs;
+  }
+
+  /**
+   * Enables the task for cleanup by changing permissions of the specified path
+   * in the local filesystem
+   */
+  @Override
+  void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString()
+                + " for " + context.fullPath);
+    }
+
+    if (context instanceof TaskControllerPathDeletionContext) {
+      TaskControllerPathDeletionContext tContext =
+        (TaskControllerPathDeletionContext) context;
+    
+      if (tContext.task.getUser() != null &&
+          tContext.fs instanceof LocalFileSystem) {
+        try {
+          runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP,
+                   tContext.task.getUser(),
+                   buildTaskCleanupArgs(tContext), null, null);
+        } catch(IOException e) {
+          LOG.warn("Uanble to change permissions for " + tContext.fullPath);
+        }
+      }
+      else {
+        throw new IllegalArgumentException("Either user is null or the "  +
+                               "file system is not local file system.");
+      }
+    }
+    else {
+      throw new IllegalArgumentException("PathDeletionContext provided is not "
+          + "TaskControllerPathDeletionContext.");
+    }
+  }
+
   private void logOutput(String output) {
     String shExecOutput = output;
     if (shExecOutput != null) {
@@ -436,7 +503,8 @@ class LinuxTaskController extends TaskCo
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
         command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
+        buildKillTaskCommandArgs(context), context.env.workDir,
+        context.env.env);
     try {
       shExec.execute();
     } catch (Exception e) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar  4 03:43:43 2011
@@ -24,6 +24,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.util.StringUtils;
@@ -187,6 +190,67 @@ public abstract class TaskController imp
   }
 
   /**
+   * Contains info related to the path of the file/dir to be deleted. This info
+   * is needed by task-controller to build the full path of the file/dir
+   */
+  static class TaskControllerPathDeletionContext extends PathDeletionContext {
+    Task task;
+    boolean isWorkDir;
+    TaskController taskController;
+
+    /**
+     * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or
+     * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir
+     * is built using mapredLocalDir, jobId, taskId, etc.
+     */
+    Path mapredLocalDir;
+
+    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
+        Task task, boolean isWorkDir, TaskController taskController) {
+      super(fs, null);
+      this.task = task;
+      this.isWorkDir = isWorkDir;
+      this.taskController = taskController;
+      this.mapredLocalDir = mapredLocalDir;
+    }
+
+    @Override
+    protected String getPathForCleanup() {
+      if (fullPath == null) {
+        fullPath = buildPathForDeletion();
+      }
+      return fullPath;
+    }
+
+    /**
+     * Builds the path of taskAttemptDir OR taskWorkDir based on
+     * mapredLocalDir, jobId, taskId, etc
+     */
+    String buildPathForDeletion() {
+      String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask())
+        : TaskTracker.getLocalTaskDir(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask());
+
+      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir;
+    }
+
+    /**
+     * Makes the path(and its subdirectories recursively) fully deletable by
+     * setting proper permissions(770) by task-controller
+     */
+    @Override
+    protected void enablePathForCleanup() throws IOException {
+      getPathForCleanup();// allow init of fullPath, if not inited already
+      if (fs.exists(new Path(fullPath))) {
+        taskController.enableTaskForCleanup(this);
+      }
+    }
+  }
+
+  /**
    * NOTE: This class is internal only class and not intended for users!!
    * 
    */
@@ -207,6 +271,13 @@ public abstract class TaskController imp
   abstract void terminateTask(TaskControllerContext context);
   
   /**
+   * Enable the task for cleanup by changing permissions of the path
+   * @param context   path deletion context
+   * @throws IOException
+   */
+  abstract void enableTaskForCleanup(PathDeletionContext context)
+      throws IOException;
+  /**
    * Sends a KILL signal to forcefully terminate the taskJVM and its
    * sub-processes.
    * 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:43:43 2011
@@ -638,6 +638,39 @@ abstract class TaskRunner extends Thread
   }
    
   /**
+   * Sets permissions recursively and then deletes the contents of dir.
+   * Makes dir empty directory(does not delete dir itself).
+   */
+  static void deleteDirContents(JobConf conf, File dir) throws IOException {
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(new Path(dir.getAbsolutePath()))) {
+      File contents[] = dir.listFiles();
+      if (contents != null) {
+        for (int i = 0; i < contents.length; i++) {
+          try {
+            int ret = 0;
+            if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(),
+                                      "ug+rwx", true)) != 0) {
+              LOG.warn("Unable to chmod for " + contents[i] + 
+                  "; chmod exit status = " + ret);
+            }
+          } catch(InterruptedException e) {
+            LOG.warn("Interrupted while setting permissions for contents of " +
+                "workDir. Not deleting the remaining contents of workDir.");
+            return;
+          }
+          if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) {
+            LOG.warn("Unable to delete "+ contents[i]);
+          }
+        }
+      }
+    }
+    else {
+      LOG.warn(dir + " does not exist.");
+    }
+  }
+  
+  /**
    * Creates distributed cache symlinks and tmp directory, as appropriate.
    * Note that when we setup the distributed
    * cache, we didn't create the symlinks. This is done on a per task basis
@@ -647,11 +680,14 @@ abstract class TaskRunner extends Thread
    * @param workDir Working directory, which is completely deleted.
    */
   public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
-    LOG.debug("Fully deleting and re-creating" + workDir);
-    FileUtil.fullyDelete(workDir);
-    if (!workDir.mkdir()) {
-      LOG.debug("Did not recreate " + workDir);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Fully deleting contents of " + workDir);
     }
+
+    /** delete only the contents of workDir leaving the directory empty. We
+     * can't delete the workDir as it is the current working directory.
+     */
+    deleteDirContents(conf, workDir);
     
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:43:43 2011
@@ -69,6 +69,8 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
 import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -186,7 +188,7 @@ public class TaskTracker 
   
   // The filesystem where job files are stored
   FileSystem systemFS = null;
-  FileSystem localFs = null;
+  private FileSystem localFs = null;
   private final HttpServer server;
     
   volatile boolean shuttingDown = false;
@@ -401,6 +403,11 @@ public class TaskTracker 
     return taskController;
   }
   
+  // Currently this is used only by tests
+  void setTaskController(TaskController t) {
+    taskController = t;
+  }
+  
   private RunningJob addTaskToJob(JobID jobId, 
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
@@ -518,10 +525,7 @@ public class TaskTracker 
   
   static String getTaskWorkDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
-    if (isCleanupAttempt) {
-      dir = dir + TASK_CLEANUP_SUFFIX;
-    }
+    String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
     return dir + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
@@ -1204,7 +1208,16 @@ public class TaskTracker 
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
   }
+
+  // only used by tests
+  void setCleanupThread(CleanupQueue c) {
+    directoryCleanupThread = c;
+  }
   
+  CleanupQueue getCleanupThread() {
+    return directoryCleanupThread;
+  }
+
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
@@ -1620,6 +1633,44 @@ public class TaskTracker 
   }
 
   /**
+   * Builds list of PathDeletionContext objects for the given paths
+   */
+  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
+      Path[] paths) {
+    int i = 0;
+    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
+    }
+    return contexts;
+  }
+
+  /**
+   * Builds list of TaskControllerPathDeletionContext objects for a task
+   * @param fs    : FileSystem in which the dirs to be deleted
+   * @param paths : mapred-local-dirs
+   * @param task  : the task whose taskDir or taskWorkDir is going to be deleted
+   * @param isWorkDir : the dir to be deleted is workDir or taskDir
+   * @param taskController : the task-controller to be used for deletion of
+   *                         taskDir or taskWorkDir
+   */
+  static PathDeletionContext[] buildTaskControllerPathDeletionContexts(
+      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
+      TaskController taskController)
+      throws IOException {
+    int i = 0;
+    PathDeletionContext[] contexts =
+                          new TaskControllerPathDeletionContext[paths.length];
+
+    for (Path p : paths) {
+      contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task,
+                          isWorkDir, taskController);
+    }
+    return contexts;
+  }
+
+  /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
    * @throws IOException
@@ -1668,8 +1719,9 @@ public class TaskTracker 
    */
   void removeJobFiles(String user, String jobId)
   throws IOException {
-    directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
-        getLocalJobDir(user, jobId)));
+    PathDeletionContext[] contexts = buildPathDeletionContexts(localFs,
+        getLocalFiles(fConf, getLocalJobDir(user, jobId)));
+    directoryCleanupThread.addToQueue(contexts);
   }
 
   /**
@@ -2766,29 +2818,33 @@ public class TaskTracker 
           runner.close();
         }
 
-        String localTaskDir =
-            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
-                .toString(), task.isTaskCleanupTask());
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
           // No jvm reuse, remove everything
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, localTaskDir));
+          PathDeletionContext[] contexts =
+            buildTaskControllerPathDeletionContexts(localFs,
+                getLocalFiles(fConf, ""), task, false/* not workDir */,
+                taskController);
+          directoryCleanupThread.addToQueue(contexts);
         } else {
           // Jvm reuse. We don't delete the workdir since some other task
           // (running in the same JVM) might be using the dir. The JVM
           // running the tasks would clean the workdir per a task in the
           // task process itself.
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, localTaskDir + Path.SEPARATOR
-                  + TaskTracker.JOBFILE));
+          String localTaskDir =
+            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
+                .toString(), task.isTaskCleanupTask());
+          PathDeletionContext[] contexts = buildPathDeletionContexts(
+              localFs, getLocalFiles(defaultJobConf, localTaskDir +
+                         Path.SEPARATOR + TaskTracker.JOBFILE));
+          directoryCleanupThread.addToQueue(contexts);
         }
       } else {
         if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          String taskWorkDir =
-              getTaskWorkDir(task.getUser(), task.getJobID().toString(),
-                  taskId.toString(), task.isTaskCleanupTask());
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-              defaultJobConf, taskWorkDir));
+          PathDeletionContext[] contexts =
+            buildTaskControllerPathDeletionContexts(localFs,
+              getLocalFiles(fConf, ""), task, true /* workDir */,
+              taskController);
+          directoryCleanupThread.addToQueue(contexts);
         }
       }
     }
@@ -3380,17 +3436,28 @@ public class TaskTracker 
   
 
   // get the full paths of the directory in all the local disks.
-  private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+  Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
     String[] localDirs = conf.getLocalDirs();
     Path[] paths = new Path[localDirs.length];
     FileSystem localFs = FileSystem.getLocal(conf);
+    boolean subdirNeeded = (subdir != null) && (subdir.length() > 0);
     for (int i = 0; i < localDirs.length; i++) {
-      paths[i] = new Path(localDirs[i], subdir);
+      paths[i] = (subdirNeeded) ? new Path(localDirs[i], subdir)
+                                : new Path(localDirs[i]);
       paths[i] = paths[i].makeQualified(localFs);
     }
     return paths;
   }
 
+  FileSystem getLocalFileSystem(){
+    return localFs;
+  }
+
+  // only used by tests
+  void setLocalFileSystem(FileSystem fs){
+    localFs = fs;
+  }
+
   int getMaxCurrentMapTasks() {
     return maxMapSlots;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar  4 03:43:43 2011
@@ -193,7 +193,7 @@ public class Localizer {
     synchronized (localizedUser) {
 
       if (localizedUser.get()) {
-        // User-directories are already localized for his user.
+        // User-directories are already localized for this user.
         LOG.info("User-directories for the user " + user
             + " are already initialized on this TT. Not doing anything.");
         return;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar  4 03:43:43 2011
@@ -257,7 +257,18 @@ public class MiniMRCluster {
   public int getNumTaskTrackers() {
     return taskTrackerList.size();
   }
-    
+  
+  /**
+   * Sets inline cleanup threads to all task trackers sothat deletion of
+   * temporary files/dirs happen inline
+   */
+  public void setInlineCleanupThreads() {
+    for (int i = 0; i < getNumTaskTrackers(); i++) {
+      getTaskTrackerRunner(i).getTaskTracker().setCleanupThread(
+          new UtilsForTests.InlineCleanupQueue());
+    }
+  }
+
   /**
    * Wait until the system is idle.
    */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java Fri Mar  4 03:43:43 2011
@@ -58,6 +58,9 @@ public class TestJobDirCleanup extends T
       namenode = fileSys.getUri().toString();
       mr = new MiniMRCluster(10, namenode, 3, 
           null, null, mrConf);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
       JobConf jobConf = mr.createJobConf();
       runSleepJob(jobConf);
@@ -66,13 +69,8 @@ public class TestJobDirCleanup extends T
                            "/taskTracker/jobcache";
         File jobDir = new File(jobDirStr);
         String[] contents = jobDir.list();
-        while (contents.length > 0) {
-          try {
-            Thread.sleep(1000);
-            LOG.warn(jobDir +" not empty yet");
-            contents = jobDir.list();
-          } catch (InterruptedException ie){}
-        }
+        assertTrue("Contents of " + jobDir + " not cleanup.",
+                   (contents == null || contents.length == 0));
       }
     } catch (Exception ee){
     } finally {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar  4 03:43:43 2011
@@ -44,10 +44,15 @@ public class TestLocalizationWithLinuxTa
   private static String taskTrackerSpecialGroup;
 
   @Override
+  protected boolean canRun() {
+    return ClusterWithLinuxTaskController.shouldRun();
+  }
+
+  @Override
   protected void setUp()
       throws Exception {
 
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
+    if (!canRun()) {
       return;
     }
 
@@ -65,7 +70,8 @@ public class TestLocalizationWithLinuxTa
     taskController.setConf(trackerFConf);
     taskController.setup();
 
-    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
         taskController));
 
     // Rewrite conf so as to reflect task's correct user name.
@@ -83,7 +89,7 @@ public class TestLocalizationWithLinuxTa
   @Override
   protected void tearDown()
       throws Exception {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
+    if (!canRun()) {
       return;
     }
     super.tearDown();
@@ -98,21 +104,6 @@ public class TestLocalizationWithLinuxTa
     // Do nothing.
   }
 
-  /**
-   * Test the localization of a user on the TT when {@link LinuxTaskController}
-   * is in use.
-   */
-  @Override
-  public void testUserLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testJobLocalization();
-  }
-
   @Override
   protected void checkUserLocalization()
       throws IOException {
@@ -150,21 +141,6 @@ public class TestLocalizationWithLinuxTa
     }
   }
 
-  /**
-   * Test job localization with {@link LinuxTaskController}. Also check the
-   * permissions and file ownership of the job related files.
-   */
-  @Override
-  public void testJobLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testJobLocalization();
-  }
-
   @Override
   protected void checkJobLocalization()
       throws IOException {
@@ -210,21 +186,6 @@ public class TestLocalizationWithLinuxTa
     }
   }
 
-  /**
-   * Test task localization with {@link LinuxTaskController}. Also check the
-   * permissions and file ownership of task related files.
-   */
-  @Override
-  public void testTaskLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testTaskLocalization();
-  }
-
   @Override
   protected void checkTaskLocalization()
       throws IOException {
@@ -250,16 +211,4 @@ public class TestLocalizationWithLinuxTa
           .getUser(), taskTrackerSpecialGroup);
     }
   }
-
-  /**
-   * Test cleanup of task files with {@link LinuxTaskController}.
-   */
-  @Override
-  public void testTaskCleanup()
-      throws IOException {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testTaskCleanup();
-  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Mar  4 03:43:43 2011
@@ -60,6 +60,10 @@ public class TestMiniMRLocalFS extends T
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(2, "file:///", 3);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
+
       TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
 
       // run the wordcount example with caching

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar  4 03:43:43 2011
@@ -322,6 +322,9 @@ public class TestMiniMRWithDFS extends T
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
 
       runPI(mr, mr.createJobConf());
       runWordCount(mr, mr.createJobConf());

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=1077127&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupWorkDir.java Fri Mar  4 03:43:43 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+public class TestSetupWorkDir extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestSetupWorkDir.class);
+
+  /**
+   * Create a file in the given dir and set permissions r_xr_xr_x sothat no one
+   * can delete it directly(without doing chmod).
+   * Creates dir/subDir and dir/subDir/file
+   */
+  static void createFileAndSetPermissions(JobConf jobConf, Path dir)
+       throws IOException {
+    Path subDir = new Path(dir, "subDir");
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    fs.mkdirs(subDir);
+    Path p = new Path(subDir, "file");
+    DataOutputStream out = fs.create(p);
+    out.writeBytes("dummy input");
+    out.close();
+    // no write permission for subDir and subDir/file
+    try {
+      int ret = 0;
+      if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
+        LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
+      }
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted while doing chmod for " + subDir);
+    }
+  }
+
+  /**
+   * Validates if setupWorkDir is properly cleaning up contents of workDir.
+   */
+  public void testSetupWorkDir() throws IOException {
+    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
+                            "testSetupWorkDir");
+    Path myWorkDir = new Path(rootDir, "./work");
+    JobConf jConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jConf);
+    if (fs.exists(myWorkDir)) {
+      fs.delete(myWorkDir, true);
+    }
+    if (!fs.mkdirs(myWorkDir)) {
+      throw new IOException("Unable to create workDir " + myWorkDir);
+    }
+
+    // create {myWorkDir}/subDir/file and set 555 perms for subDir and file
+    createFileAndSetPermissions(jConf, myWorkDir);
+
+    TaskRunner.deleteDirContents(jConf, new File(myWorkDir.toUri().getPath()));
+    
+    assertTrue("Contents of " + myWorkDir + " are not cleaned up properly.",
+        fs.listStatus(myWorkDir).length == 0);
+    
+    // cleanup
+    fs.delete(rootDir, true);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 03:43:43 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.JvmManag
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 
 import junit.framework.TestCase;
 
@@ -73,36 +74,20 @@ public class TestTaskTrackerLocalization
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
 
-  class InlineCleanupQueue extends CleanupQueue {
-    List<Path> stalePaths = new ArrayList<Path>();
-
-    public InlineCleanupQueue() {
-      // do nothing
-    }
-
-    @Override
-    public void addToQueue(FileSystem fs, Path... paths) {
-      // delete in-line
-      for (Path p : paths) {
-        try {
-          LOG.info("Trying to delete the path " + p);
-          if (!fs.delete(p, true)) {
-            LOG.warn("Stale path " + p.toUri().getPath());
-            stalePaths.add(p);
-          }
-        } catch (IOException e) {
-          LOG.warn("Caught exception while deleting path "
-              + p.toUri().getPath());
-          LOG.info(StringUtils.stringifyException(e));
-          stalePaths.add(p);
-        }
-      }
-    }
+  /**
+   * Dummy method in this base class. Only derived classes will define this
+   * method for checking if a test can be run.
+   */
+  protected boolean canRun() {
+    return true;
   }
 
   @Override
   protected void setUp()
       throws Exception {
+    if (!canRun()) {
+      return;
+    }
     TEST_ROOT_DIR =
         new File(System.getProperty("test.build.data", "/tmp"), getClass()
             .getSimpleName());
@@ -143,8 +128,9 @@ public class TestTaskTrackerLocalization
     tracker.setConf(trackerFConf);
 
     // for test case system FS is the local FS
-    tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
-
+    tracker.systemFS = FileSystem.getLocal(trackerFConf);
+    tracker.setLocalFileSystem(tracker.systemFS);
+    
     taskTrackerUGI = UserGroupInformation.login(trackerFConf);
 
     // Set up the task to be localized
@@ -159,8 +145,10 @@ public class TestTaskTrackerLocalization
     taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
     taskController.setup();
-    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
-    taskController));
+
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
+                                       taskController));
   }
 
   /**
@@ -204,6 +192,9 @@ public class TestTaskTrackerLocalization
   @Override
   protected void tearDown()
       throws Exception {
+    if (!canRun()) {
+      return;
+    }
     FileUtil.fullyDelete(TEST_ROOT_DIR);
   }
 
@@ -235,6 +226,9 @@ public class TestTaskTrackerLocalization
    */
   public void testTaskControllerSetup()
       throws IOException {
+    if (!canRun()) {
+      return;
+    }
     // Task-controller is already set up in the test's setup method. Now verify.
     for (String localDir : localDirs) {
 
@@ -258,7 +252,9 @@ public class TestTaskTrackerLocalization
    */
   public void testUserLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     // /////////// The main method being tested
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     // ///////////
@@ -329,7 +325,9 @@ public class TestTaskTrackerLocalization
    */
   public void testJobLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
 
     // /////////// The main method being tested
@@ -423,7 +421,9 @@ public class TestTaskTrackerLocalization
    */
   public void testTaskLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     localizedJobConf = tracker.localizeJobFiles(task);
 
@@ -539,14 +539,102 @@ public class TestTaskTrackerLocalization
   }
 
   /**
+   * Validates the removal of $taskid and $tasid/work under mapred-local-dir
+   * in cases where those directories cannot be deleted without adding
+   * write permission to the newly created directories under $taskid and
+   * $taskid/work
+   * Also see TestSetupWorkDir.createFileAndSetPermissions for details
+   */
+  void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
+                           TaskInProgress tip) throws IOException {
+    // create files and set permissions 555. Verify if task controller sets
+    // the permissions for TT to delete the taskDir or workDir
+    String dir = (!needCleanup || jvmReuse) ?
+        TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(),
+          taskId.toString(), task.isTaskCleanupTask())
+      : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
+          taskId.toString(), task.isTaskCleanupTask());
+
+    Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
+    for (Path p : paths) {
+      if (tracker.getLocalFileSystem().exists(p)) {
+        TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p);
+      }
+    }
+
+    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+    tracker.setCleanupThread(cleanupQueue);
+
+    tip.removeTaskFiles(needCleanup, taskId);
+
+    if (jvmReuse) {
+      // work dir should still exist and cleanup queue should be empty
+      assertTrue("cleanup queue is not empty after removeTaskFiles() in case "
+          + "of jvm reuse.", cleanupQueue.isQueueEmpty());
+      boolean workDirExists = false;
+      for (Path p : paths) {
+        if (tracker.getLocalFileSystem().exists(p)) {
+          workDirExists = true;
+        }
+      }
+      assertTrue("work dir does not exist in case of jvm reuse", workDirExists);
+
+      // now try to delete the work dir and verify that there are no stale paths
+      JvmManager.deleteWorkDir(tracker, task);
+    }
+    tracker.removeJobFiles(task.getUser(), jobId.toString());
+
+    assertTrue("Some task files are not deleted!! Number of stale paths is "
+        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+  }
+
+  /**
+   * Validates if task cleanup is done properly for a succeeded task
    * @throws IOException
    */
   public void testTaskCleanup()
       throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(false, false);// no needCleanup; no jvmReuse
+  }
 
+  /**
+   * Validates if task cleanup is done properly for a task that is not succeeded
+   * @throws IOException
+   */
+  public void testFailedTaskCleanup()
+  throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(true, false);// needCleanup; no jvmReuse
+  }
+
+  /**
+   * Validates if task cleanup is done properly for a succeeded task
+   * @throws IOException
+   */
+  public void testTaskCleanupWithJvmUse()
+      throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(false, true);// no needCleanup; jvmReuse
+  }
+
+  /**
+   * Validates if task cleanup is done properly
+   */
+  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+      throws IOException {
     // Localize job and localize task.
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     localizedJobConf = tracker.localizeJobFiles(task);
+    if (jvmReuse) {
+      localizedJobConf.setNumTasksToExecutePerJvm(2);
+    }
     // Now initialize the job via task-controller so as to set
     // ownership/permissions of jars, job-work-dir
     JobInitializationContext jobContext = new JobInitializationContext();
@@ -585,18 +673,9 @@ public class TestTaskTrackerLocalization
 
     // TODO: Let the task run and create files.
 
-    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
-    tracker.directoryCleanupThread = cleanupQueue;
-
-    // ////////// The central methods being tested
-    tip.removeTaskFiles(true, taskId);
-    tracker.removeJobFiles(task.getUser(), jobId.toString());
-    // //////////
-
-    // TODO: make sure that all files intended to be deleted are deleted.
-
-    assertTrue("Some task files are not deleted!! Number of stale paths is "
-        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+    // create files and set permissions 555. Verify if task controller sets
+    // the permissions for TT to delete the task dir or work dir properly
+    validateRemoveFiles(needCleanup, jvmReuse, tip);
 
     // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
     // there.
@@ -604,7 +683,7 @@ public class TestTaskTrackerLocalization
       Path userDir =
           new Path(localDir, TaskTracker.getUserDir(task.getUser()));
       assertTrue("User directory " + userDir + " is not present!!",
-          tracker.localFs.exists(userDir));
+          tracker.getLocalFileSystem().exists(userDir));
     }
 
     // Test userlogs cleanup.
@@ -624,7 +703,7 @@ public class TestTaskTrackerLocalization
 
     // Logs should be there before cleanup.
     assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
-        tracker.localFs.exists(logDir));
+        tracker.getLocalFileSystem().exists(logDir));
 
     // ////////// Another being tested
     TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
@@ -633,6 +712,6 @@ public class TestTaskTrackerLocalization
 
     // Logs should be gone after cleanup.
     assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
-        tracker.localFs.exists(logDir));
+        tracker.getLocalFileSystem().exists(logDir));
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar  4 03:43:43 2011
@@ -19,9 +19,11 @@
 package org.apache.hadoop.mapred;
 
 import java.text.DecimalFormat;
+import java.util.ArrayList;
 import java.io.*;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Enumeration;
 import java.util.Properties;
 
@@ -46,6 +48,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.StringUtils;
 
 /** 
  * Utilities used in unit test.
@@ -426,7 +429,37 @@ public class UtilsForTests {
       }
     }
   }
-  
+
+  /**
+   * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
+   * asynchronously.
+   */
+  public static class InlineCleanupQueue extends CleanupQueue {
+    List<String> stalePaths = new ArrayList<String>();
+
+    public InlineCleanupQueue() {
+      // do nothing
+    }
+
+    @Override
+    public void addToQueue(PathDeletionContext... contexts) {
+      // delete paths in-line
+      for (PathDeletionContext context : contexts) {
+        try {
+          if (!deletePath(context)) {
+            LOG.warn("Stale path " + context.fullPath);
+            stalePaths.add(context.fullPath);
+          }
+        } catch (IOException e) {
+          LOG.warn("Caught exception while deleting path "
+              + context.fullPath);
+          LOG.info(StringUtils.stringifyException(e));
+          stalePaths.add(context.fullPath);
+        }
+      }
+    }
+  }
+
   static String getTaskSignalParameter(boolean isMap) {
     return isMap 
            ? "test.mapred.map.waiting.target" 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java?rev=1077127&r1=1077126&r2=1077127&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java Fri Mar  4 03:43:43 2011
@@ -58,7 +58,10 @@ public class TestServiceLevelAuthorizati
       JobConf mrConf = new JobConf(conf);
       mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1, 
                              null, null, mrConf);
-
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
+      
       // Run examples
       TestMiniMRWithDFS.runPI(mr, mr.createJobConf(mrConf));
       TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf(mrConf));