You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/04/23 00:56:59 UTC

svn commit: r1096075 - in /hadoop/common/branches/branch-0.20-security: ./ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/filecache/ src/test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Apr 22 22:56:58 2011
New Revision: 1096075

URL: http://svn.apache.org/viewvc?rev=1096075&view=rev
Log:
MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing 
itself. (Ravi Gummadi and Jagane Sundar via omalley)

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Apr 22 22:56:58 2011
@@ -60,6 +60,9 @@ Release 0.20.204.0 - unreleased
     HDFS-1767. Namenode ignores non-initial block report from datanodes
     when in safemode during startup. (Matt Foley via suresh)
 
+    MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing 
+    itself. (Ravi Gummadi and Jagane Sundar via omalley)
+
 Release 0.20.203.0 - unreleased
 
     HADOOP-7190. Add metrics v1 back for backwards compatibility. (omalley)

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/configuration.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.c Fri Apr 22 22:56:58 2011
@@ -241,8 +241,9 @@ void read_config(const char* file_name) 
 /*
  * function used to get a configuration value.
  * The function for the first time populates the configuration details into
- * array, next time onwards used the populated array.
+ * array, next time onwards uses the populated array.
  *
+ * Memory returned here should be freed using free.
  */
 char * get_value(const char* key) {
   int count;
@@ -259,8 +260,15 @@ char * get_value(const char* key) {
  * Value delimiter is assumed to be a comma.
  */
 char ** get_values(const char * key) {
-  char ** toPass = NULL;
   char *value = get_value(key);
+  return extract_values(value);
+}
+
+/**
+ * Extracts array of values from the comma separated list of values.
+ */
+char ** extract_values(char * value) {
+  char ** toPass = NULL;
   char *tempTok = NULL;
   char *tempstr = NULL;
   int size = 0;
@@ -286,9 +294,13 @@ char ** get_values(const char * key) {
   return toPass;
 }
 
-// free an entry set of values
+/**
+ * Free an entry set of values.
+ */
 void free_values(char** values) {
   if (*values != NULL) {
+    // the values were tokenized from the same malloc, so freeing the first
+    // frees the entire block.
     free(*values);
   }
   if (values != NULL) {

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/configuration.h?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/configuration.h Fri Apr 22 22:56:58 2011
@@ -34,6 +34,9 @@ char *get_value(const char* key);
 //comma seperated strings.
 char ** get_values(const char* key);
 
+// Extracts array of values from the comma separated list of values.
+char ** extract_values(char * value);
+
 // free the memory returned by get_values
 void free_values(char** values);
 

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/main.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c Fri Apr 22 22:56:58 2011
@@ -35,7 +35,9 @@
 
 void display_usage(FILE *stream) {
   fprintf(stream,
-      "Usage: task-controller user command command-args\n");
+   "Usage: task-controller user good-local-dirs command command-args\n");
+  fprintf(stream, " where good-local-dirs is a comma separated list of " \
+          "good mapred local directories.\n");
   fprintf(stream, "Commands:\n");
   fprintf(stream, "   initialize job:       %2d jobid credentials cmd args\n",
 	  INITIALIZE_JOB);
@@ -53,13 +55,14 @@ void display_usage(FILE *stream) {
 
 int main(int argc, char **argv) {
   //Minimum number of arguments required to run the task-controller
-  if (argc < 4) {
+  if (argc < 5) {
     display_usage(stdout);
     return INVALID_ARGUMENT_NUMBER;
   }
 
   LOGFILE = stdout;
   int command;
+  const char * good_local_dirs = NULL;
   const char * job_id = NULL;
   const char * task_id = NULL;
   const char * cred_file = NULL;
@@ -124,41 +127,46 @@ int main(int argc, char **argv) {
   }
 
   optind = optind + 1;
+  good_local_dirs = argv[optind];
+  if (good_local_dirs == NULL) {
+    return INVALID_TT_ROOT;
+  }
+
+  optind = optind + 1;
   command = atoi(argv[optind++]);
 
   fprintf(LOGFILE, "main : command provided %d\n",command);
   fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
+  fprintf(LOGFILE, "Good mapred-local-dirs are %s\n", good_local_dirs);
 
   switch (command) {
   case INITIALIZE_JOB:
-    if (argc < 7) {
-      fprintf(LOGFILE, "Too few arguments (%d vs 7) for initialize job\n",
-	      argc);
+    if (argc < 8) {
+      fprintf(LOGFILE, "Too few arguments (%d vs 8) for initialize job\n",
+              argc);
       return INVALID_ARGUMENT_NUMBER;
     }
     job_id = argv[optind++];
     cred_file = argv[optind++];
     job_xml = argv[optind++];
-    exit_code = initialize_job(user_detail->pw_name, job_id, cred_file,
-                               job_xml, argv + optind);
+    exit_code = initialize_job(user_detail->pw_name, good_local_dirs, job_id,
+                               cred_file, job_xml, argv + optind);
     break;
   case LAUNCH_TASK_JVM:
-    if (argc < 7) {
-      fprintf(LOGFILE, "Too few arguments (%d vs 7) for launch task\n",
-	      argc);
+    if (argc < 8) {
+      fprintf(LOGFILE, "Too few arguments (%d vs 8) for launch task\n", argc);
       return INVALID_ARGUMENT_NUMBER;
     }
     job_id = argv[optind++];
     task_id = argv[optind++];
     current_dir = argv[optind++];
     script_file = argv[optind++];
-    exit_code = run_task_as_user(user_detail->pw_name, job_id, task_id, 
-                                 current_dir, script_file);
+    exit_code = run_task_as_user(user_detail->pw_name, good_local_dirs, job_id,
+                                 task_id, current_dir, script_file);
     break;
   case SIGNAL_TASK:
-    if (argc < 5) {
-      fprintf(LOGFILE, "Too few arguments (%d vs 5) for signal task\n",
-	      argc);
+    if (argc < 6) {
+      fprintf(LOGFILE, "Too few arguments (%d vs 6) for signal task\n", argc);
       return INVALID_ARGUMENT_NUMBER;
     } else {
       char* end_ptr = NULL;
@@ -179,7 +187,8 @@ int main(int argc, char **argv) {
     break;
   case DELETE_AS_USER:
     dir_to_be_deleted = argv[optind++];
-    exit_code= delete_as_user(user_detail->pw_name, dir_to_be_deleted);
+    exit_code= delete_as_user(user_detail->pw_name, good_local_dirs,
+                              dir_to_be_deleted);
     break;
   case DELETE_LOG_AS_USER:
     dir_to_be_deleted = argv[optind++];

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c Fri Apr 22 22:56:58 2011
@@ -42,8 +42,6 @@
 
 #define TT_LOCAL_TASK_DIR_PATTERN    "%s/taskTracker/%s/jobcache/%s/%s"
 
-#define TT_SYS_DIR_KEY "mapred.local.dir"
-
 #define TT_LOG_DIR_KEY "hadoop.log.dir"
 
 #define JOB_FILENAME "job.xml"
@@ -210,6 +208,14 @@ int change_user(uid_t user, gid_t group)
 }
 
 /**
+ * Get the array of mapred local dirs from the given comma separated list of
+ * paths. Memory allocated using strdup here is freed up in free_values().
+ */
+char ** get_mapred_local_dirs(const char * good_local_dirs) {
+  return extract_values(strdup(good_local_dirs));
+}
+
+/**
  * Utility function to concatenate argB to argA using the concat_pattern.
  */
 char *concatenate(char *concat_pattern, char *return_path_name, 
@@ -353,8 +359,8 @@ int mkdirs(const char* path, mode_t perm
  * Function to prepare the attempt directories for the task JVM.
  * It creates the task work and log directories.
  */
-static int create_attempt_directories(const char* user, const char *job_id, 
-					const char *task_id) {
+static int create_attempt_directories(const char* user,
+    const char * good_local_dirs, const char *job_id, const char *task_id) {
   // create dirs as 0750
   const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
   if (job_id == NULL || task_id == NULL || user == NULL) {
@@ -364,11 +370,11 @@ static int create_attempt_directories(co
   }
   int result = 0;
 
-  char **local_dir = get_values(TT_SYS_DIR_KEY);
+  char **local_dir = get_mapred_local_dirs(good_local_dirs);
 
   if (local_dir == NULL) {
-    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
-    return -1;
+    fprintf(LOGFILE, "Good mapred local directories could not be obtained.\n");
+    return INVALID_TT_ROOT;
   }
 
   char **local_dir_ptr;
@@ -635,10 +641,10 @@ static int copy_file(int input, const ch
 /**
  * Function to initialize the user directories of a user.
  */
-int initialize_user(const char *user) {
-  char **local_dir = get_values(TT_SYS_DIR_KEY);
+int initialize_user(const char *user, const char * good_local_dirs) {
+  char **local_dir = get_mapred_local_dirs(good_local_dirs);
   if (local_dir == NULL) {
-    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    fprintf(LOGFILE, "Good mapred local directories could ot be obtained.\n");
     return INVALID_TT_ROOT;
   }
 
@@ -664,16 +670,16 @@ int initialize_user(const char *user) {
 /**
  * Function to prepare the job directories for the task JVM.
  */
-int initialize_job(const char *user, const char *jobid, 
-		   const char* credentials, const char* job_xml,
-                   char* const* args) {
+int initialize_job(const char *user, const char * good_local_dirs,
+    const char *jobid, const char* credentials, const char* job_xml,
+    char* const* args) {
   if (jobid == NULL || user == NULL) {
     fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n");
     return INVALID_ARGUMENT_NUMBER;
   }
 
   // create the user directory
-  int result = initialize_user(user);
+  int result = initialize_user(user, good_local_dirs);
   if (result != 0) {
     return result;
   }
@@ -707,10 +713,10 @@ int initialize_job(const char *user, con
 
   // 750
   mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP;
-  char **tt_roots = get_values(TT_SYS_DIR_KEY);
+  char **tt_roots = get_mapred_local_dirs(good_local_dirs);
 
   if (tt_roots == NULL) {
-    return INVALID_CONFIG_FILE;
+    return INVALID_TT_ROOT;
   }
 
   char **tt_root;
@@ -771,12 +777,12 @@ int initialize_job(const char *user, con
  * 4) Does an execlp on the same in order to replace the current image with
  *    task image.
  */
-int run_task_as_user(const char *user, const char *job_id, 
-                     const char *task_id, const char *work_dir,
-                     const char *script_name) {
+int run_task_as_user(const char *user, const char * good_local_dirs,
+                     const char *job_id, const char *task_id,
+                     const char *work_dir, const char *script_name) {
   int exit_code = -1;
   char *task_script_path = NULL;
-  if (create_attempt_directories(user, job_id, task_id) != 0) {
+  if (create_attempt_directories(user, good_local_dirs, job_id, task_id) != 0) {
     goto cleanup;
   }
   int task_file_source = open_file_as_task_tracker(script_name);
@@ -1002,15 +1008,15 @@ static int delete_path(const char *full_
  * user: the user doing the delete
  * subdir: the subdir to delete
  */
-int delete_as_user(const char *user,
+int delete_as_user(const char *user, const char * good_local_dirs,
                    const char *subdir) {
   int ret = 0;
 
-  char** tt_roots = get_values(TT_SYS_DIR_KEY);
+  char** tt_roots = get_mapred_local_dirs(good_local_dirs);
   char** ptr;
   if (tt_roots == NULL || *tt_roots == NULL) {
-    fprintf(LOGFILE, "No %s defined in the configuration\n", TT_SYS_DIR_KEY);
-    return INVALID_CONFIG_FILE;
+    fprintf(LOGFILE, "Good mapred local directories could ot be obtained.\n");
+    return INVALID_TT_ROOT;
   }
 
   // do the delete

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.h?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h Fri Apr 22 22:56:58 2011
@@ -74,19 +74,20 @@ int check_taskcontroller_permissions(cha
 int delete_log_directory(const char *log_dir);
 
 // initialize the job directory
-int initialize_job(const char *user, const char *jobid,
+int initialize_job(const char *user, const char * good_local_dirs, const char *jobid,
                    const char *credentials, 
                    const char *job_xml, char* const* args);
 
 // run the task as the user
-int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+int run_task_as_user(const char * user, const char * good_local_dirs,
+                     const char *jobid, const char *taskid,
                      const char *work_dir, const char *script_name);
 
 // send a signal as the user
 int signal_user_task(const char *user, int pid, int sig);
 
 // delete a directory (or file) recursively as the user.
-int delete_as_user(const char *user,
+int delete_as_user(const char *user, const char * good_local_dirs,
                    const char *dir_to_be_deleted);
 
 // run a command as the user
@@ -140,7 +141,7 @@ int mkdirs(const char* path, mode_t perm
 /**
  * Function to initialize the user directories of a user.
  */
-int initialize_user(const char *user);
+int initialize_user(const char *user, const char * good_local_dirs);
 
 /**
  * Create a top level directory for the user.

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c Fri Apr 22 22:56:58 2011
@@ -637,7 +637,7 @@ void test_run_task() {
 	   strerror(errno));
     exit(1);
   } else if (child == 0) {
-    if (run_task_as_user(username, "job_4", "task_1", 
+    if (run_task_as_user(username, "", "job_4", "task_1", 
                          task_dir, script_name) != 0) {
       printf("FAIL: failed in child\n");
       exit(42);

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Apr 22 22:56:58 2011
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,9 +31,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.ProcessTree;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 import org.apache.commons.logging.Log;
@@ -249,8 +248,9 @@ public class DefaultTaskController exten
   }
 
   @Override
-  public void setup(LocalDirAllocator allocator) {
+  public void setup(LocalDirAllocator allocator, LocalStorage l) {
     this.allocator = allocator;
+    this.localStorage = l;
   }
   
 }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +33,7 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -48,8 +48,10 @@ import org.apache.hadoop.util.StringUtil
  * JVM and killing it when needed, and also initializing and
  * finalizing the task environment. 
  * <p> The setuid executable is launched using the command line:</p>
- * <p>task-controller user-name command command-args, where</p>
+ * <p>task-controller user-name good-local-dirs command command-args,
+ * where</p>
  * <p>user-name is the name of the owner who submits the job</p>
+ * <p>good-local-dirs is comma separated list of good mapred local dirs</p>
  * <p>command is one of the cardinal value of the 
  * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
  * <p>command-args depends on the command being launched.</p>
@@ -122,7 +124,8 @@ class LinuxTaskController extends TaskCo
   }
 
   @Override
-  public void setup(LocalDirAllocator allocator) throws IOException {
+  public void setup(LocalDirAllocator allocator, LocalStorage localStorage)
+      throws IOException {
 
     // Check the permissions of the task-controller binary by running it plainly.
     // If permissions are correct, it returns an error code 1, else it returns
@@ -142,8 +145,8 @@ class LinuxTaskController extends TaskCo
       }
     }
     this.allocator = allocator;
+    this.localStorage = localStorage;
   }
-  
 
   @Override
   public void initializeJob(String user, String jobid, Path credentials,
@@ -152,7 +155,8 @@ class LinuxTaskController extends TaskCo
                             ) throws IOException {
     List<String> command = new ArrayList<String>(
       Arrays.asList(taskControllerExe, 
-                    user, 
+                    user,
+                    localStorage.getGoodLocalDirsString(),
                     Integer.toString(Commands.INITIALIZE_JOB.getValue()),
                     jobid,
                     credentials.toUri().getPath().toString(),
@@ -216,6 +220,7 @@ class LinuxTaskController extends TaskCo
       String[] command = 
         new String[]{taskControllerExe, 
           user,
+          localStorage.getGoodLocalDirsString(),
           Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
           jobId,
           attemptId,
@@ -256,6 +261,7 @@ class LinuxTaskController extends TaskCo
     String[] command = 
       new String[]{taskControllerExe, 
                    user,
+                   localStorage.getGoodLocalDirsString(),
                    Integer.toString(Commands.DELETE_AS_USER.getValue()),
                    subDir};
     ShellCommandExecutor shExec = new ShellCommandExecutor(command);
@@ -270,6 +276,7 @@ class LinuxTaskController extends TaskCo
     String[] command = 
       new String[]{taskControllerExe, 
                    user,
+                   localStorage.getGoodLocalDirsString(),
                    Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
                    subDir};
     ShellCommandExecutor shExec = new ShellCommandExecutor(command);
@@ -285,6 +292,7 @@ class LinuxTaskController extends TaskCo
     String[] command = 
       new String[]{taskControllerExe, 
                    user,
+                   localStorage.getGoodLocalDirsString(),
                    Integer.toString(Commands.SIGNAL_TASK.getValue()),
                    Integer.toString(taskPid),
                    Integer.toString(signal.getValue())};
@@ -316,9 +324,10 @@ class LinuxTaskController extends TaskCo
     Task firstTask = allAttempts.get(0);
     String taskid = firstTask.getTaskID().toString();
     
-    LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
+    LocalDirAllocator ldirAlloc =
+        new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
     String taskRanFile = TaskTracker.TT_LOG_TMP_DIR + Path.SEPARATOR + taskid;
-    Configuration conf = new Configuration();
+    Configuration conf = getConf();
     
     //write the serialized task information to a file to pass to the truncater
     Path taskRanFilePath = 
@@ -348,12 +357,13 @@ class LinuxTaskController extends TaskCo
     command.add(TaskLogsTruncater.class.getName()); 
     command.add(taskRanFilePath.toString());
 
-    String[] taskControllerCmd = new String[3 + command.size()];
+    String[] taskControllerCmd = new String[4 + command.size()];
     taskControllerCmd[0] = taskControllerExe;
     taskControllerCmd[1] = user;
-    taskControllerCmd[2] = Integer.toString(
+    taskControllerCmd[2] = localStorage.getGoodLocalDirsString();
+    taskControllerCmd[3] = Integer.toString(
         Commands.RUN_COMMAND_AS_USER.getValue());
-    int i = 3;
+    int i = 4;
     for (String cmdArg : command) {
       taskControllerCmd[i++] = cmdArg;
     }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MRConstants.java Fri Apr 22 22:56:58 2011
@@ -29,6 +29,12 @@ interface MRConstants {
   
   public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
 
+  /**
+   * How often TaskTracker needs to check the health of its disks, if not
+   * configured using mapred.disk.healthChecker.interval
+   */
+  public static final long DEFAULT_DISK_HEALTH_CHECK_INTERVAL = 60 * 1000;
+
   //
   // Result codes
   //

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Apr 22 22:56:58 2011
@@ -17,14 +17,11 @@
 */
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,10 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.util.ProcessTree.Signal;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
  * Controls initialization, finalization and clean up of tasks, and
@@ -64,6 +59,11 @@ public abstract class TaskController imp
   
   protected LocalDirAllocator allocator;
 
+  /**
+   * LocalStorage of TaskTracker
+   */
+  protected LocalStorage localStorage;
+
   final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
   FsPermission.createImmutable((short) 0700); // rwx--------
   
@@ -78,8 +78,10 @@ public abstract class TaskController imp
   /**
    * Does initialization and setup.
    * @param allocator the local dir allocator to use
+   * @param l TaskTracker's LocalStorage object
    */
-  public abstract void setup(LocalDirAllocator allocator) throws IOException;
+  public abstract void setup(LocalDirAllocator allocator,
+      LocalStorage l) throws IOException;
   
   /**
    * Create all of the directories necessary for the job to start and download

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Apr 22 22:56:58 2011
@@ -166,8 +166,96 @@ public class TaskTracker implements MRCo
 
   volatile boolean running = true;
 
+  /**
+   *  TaskTracker internal class only.
+   *  Manages the lists of good mapred local dirs and bad mapred local dirs.
+   */
+  public static class LocalStorage {
+    private List<String> goodLocalDirs = new ArrayList<String>();
+    private List<String> badLocalDirs = new ArrayList<String>();
+    private boolean diskFailed = false;
+
+    /**
+     * TaskTracker internal only
+     */
+    public LocalStorage(String[] localDirs) {
+      for (String s : localDirs) {
+        goodLocalDirs.add(s);
+      }
+    }
+
+    /**
+     * @return good mapred local dirs list
+     */
+    synchronized String[] getGoodLocalDirs() {
+      String[] rv = new String[goodLocalDirs.size()];
+      return goodLocalDirs.toArray(rv);
+    }
+
+    /**
+     * @return good mapred local dirs list as a commma seperated string
+     */
+    synchronized String getGoodLocalDirsString() {
+      StringBuffer sb = new StringBuffer();
+      for (String s : goodLocalDirs) {
+        if (sb.length() > 0) {
+          sb.append(",");
+        }
+        sb.append(s);
+      }
+      return sb.toString();
+    }
+
+    /**
+     * @return bad mapred local dirs list
+     */
+    synchronized String[] getBadLocalDirs() {
+      String[] rv = new String[badLocalDirs.size()];
+      return badLocalDirs.toArray(rv);
+    }
+
+    /**
+     * @return true if a disk has failed since the last
+     * time this method was called
+     */
+    synchronized boolean isDiskFailed() {
+      boolean rv = diskFailed;
+      diskFailed = false;
+      return rv;
+    }
+
+    /**
+     * Check if the given local directories
+     * (and parent directories, if necessary) can be created.
+     * Updates the list of good mapred local dirs and the list of bad local
+     * dirs.
+     * @throws DiskErrorException if all local directories are not writable
+     */
+    synchronized void checkLocalDirs()
+        throws DiskErrorException {
+      for (String s : getGoodLocalDirs()) {
+        try {
+          DiskChecker.checkDir(new File(s));
+        } catch(DiskErrorException e) {
+          LOG.warn("Task Tracker localdir error " + e.getMessage()
+            + ", removing from good locadirs");
+          goodLocalDirs.remove(s);
+          badLocalDirs.add(s);
+          diskFailed = true;
+        }
+      }
+
+      // no good local dirs ?
+      if(goodLocalDirs.size() < 1) {
+        throw new DiskErrorException(
+            "All mapred local directories are not writable.");
+      }
+    }
+  }
+
+  private LocalStorage localStorage;
+  private long lastCheckDirsTime;
   private LocalDirAllocator localDirAllocator;
-  private String[] localdirs;
   String taskTrackerName;
   String localHostname;
   InetSocketAddress jobTrackAddr;
@@ -315,6 +403,19 @@ public class TaskTracker implements MRCo
    */
   private NodeHealthCheckerService healthChecker;
   
+  /**
+   * Configuration property for disk health check interval in milli seconds.
+   * Currently, configuring this to a value smaller than the heartbeat interval
+   * is equivalent to setting this to heartbeat interval value.
+   */
+  static final String DISK_HEALTH_CHECK_INTERVAL_PROPERTY =
+      "mapred.disk.healthChecker.interval";
+  /**
+   * How often TaskTracker needs to check the health of its disks.
+   * Default value is {@link MRConstants#DEFAULT_DISK_HEALTH_CHECK_INTERVAL}
+   */
+  private long diskHealthCheckInterval;
+
   /*
    * A list of commitTaskActions for whom commit response has been received 
    */
@@ -579,7 +680,7 @@ public class TaskTracker implements MRCo
    * @throws IOException
    */
   private void deleteUserDirectories(Configuration conf) throws IOException {
-    for(String root: localdirs) {
+    for(String root: localStorage.getGoodLocalDirs()) {
       for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
         String owner = status.getOwner();
         String path = status.getPath().getName();
@@ -614,23 +715,34 @@ public class TaskTracker implements MRCo
       (fConf.get("mapred.tasktracker.dns.interface","default"),
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
     }
- 
-    //check local disk
-    checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+
+    localStorage.checkLocalDirs();
+    if (localStorage.isDiskFailed()) {
+      // Ignore current disk failures. They are being handled now.
+    }
+    String dirs = localStorage.getGoodLocalDirsString();
+    fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
+    LOG.info("Good mapred local directories are: " + dirs);
+    taskController.setConf(fConf);
+    // Setup task controller so that deletion of user dirs happens properly
+    taskController.setup(localDirAllocator, localStorage);
+    server.setAttribute("conf", fConf);
+
     deleteUserDirectories(fConf);
+
     fConf.deleteLocalFiles(SUBDIR);
     final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
-    for (String s : localdirs) {
+    for (String s : localStorage.getGoodLocalDirs()) {
       localFs.mkdirs(new Path(s, SUBDIR), ttdir);
     }
     fConf.deleteLocalFiles(TT_PRIVATE_DIR);
     final FsPermission priv = FsPermission.createImmutable((short) 0700);
-    for (String s : localdirs) {
+    for (String s : localStorage.getGoodLocalDirs()) {
       localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
     }
     fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
     final FsPermission pub = FsPermission.createImmutable((short) 0755);
-    for (String s : localdirs) {
+    for (String s : localStorage.getGoodLocalDirs()) {
       localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
     }
 
@@ -726,7 +838,7 @@ public class TaskTracker implements MRCo
     reduceLauncher.start();
 
     // create a localizer instance
-    setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
+    setLocalizer(new Localizer(localFs, localStorage.getGoodLocalDirs()));
 
     //Start up node health checker service.
     if (shouldStartHealthMonitor(this.fConf)) {
@@ -737,6 +849,13 @@ public class TaskTracker implements MRCo
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
   }
 
+  /**
+   * @return TaskTracker's LocalStorage object
+   */
+  LocalStorage getLocalStorage() {
+    return localStorage;
+  }
+
   private void createInstrumentation() {
     Class<? extends TaskTrackerInstrumentation> metricsInst =
         getInstrumentationClass(fConf);
@@ -1187,6 +1306,8 @@ public class TaskTracker implements MRCo
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
                                 RunningJob rjob) throws IOException {
     synchronized (tip) {
+      jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+          localStorage.getGoodLocalDirsString());
       tip.setJobConf(jobConf);
       tip.setUGI(rjob.ugi);
       tip.launchTask(rjob);
@@ -1285,6 +1406,8 @@ public class TaskTracker implements MRCo
                   "mapred.tasktracker.map.tasks.maximum", 2);
     maxReduceSlots = conf.getInt(
                   "mapred.tasktracker.reduce.tasks.maximum", 2);
+    diskHealthCheckInterval = conf.getLong(DISK_HEALTH_CHECK_INTERVAL_PROPERTY,
+                                           DEFAULT_DISK_HEALTH_CHECK_INTERVAL);
     UserGroupInformation.setConfiguration(originalConf);
     aclsManager = new ACLsManager(conf, new JobACLsManager(conf), null);
     this.jobTrackAddr = JobTracker.getAddress(conf);
@@ -1307,9 +1430,13 @@ public class TaskTracker implements MRCo
     Class<? extends TaskController> taskControllerClass = 
       conf.getClass("mapred.task.tracker.task-controller", 
                      DefaultTaskController.class, TaskController.class);
-   taskController = 
-     (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
-   taskController.setup(localDirAllocator);
+
+    fConf = new JobConf(conf);
+    localStorage = new LocalStorage(fConf.getLocalDirs());
+    localStorage.checkLocalDirs();
+    taskController = 
+      (TaskController) ReflectionUtils.newInstance(taskControllerClass, fConf);
+    taskController.setup(localDirAllocator, localStorage);
 
     // create user log manager
     setUserLogManager(new UserLogManager(conf, taskController));
@@ -1319,7 +1446,7 @@ public class TaskTracker implements MRCo
     this.shuffleServerMetrics = ShuffleServerInstrumentation.create(this);
     server.setAttribute("task.tracker", this);
     server.setAttribute("local.file.system", local);
-    server.setAttribute("conf", conf);
+
     server.setAttribute("log", LOG);
     server.setAttribute("localDirAllocator", localDirAllocator);
     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
@@ -1448,7 +1575,17 @@ public class TaskTracker implements MRCo
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);
         }
-        
+
+        now = System.currentTimeMillis();
+        if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
+          localStorage.checkLocalDirs();
+          lastCheckDirsTime = now;
+          // If any of the good disks failed, re-init the task tracker
+          if (localStorage.isDiskFailed()) {
+            return State.STALE;
+          }
+        }
+
         // Send the heartbeat and process the jobtracker's directives
         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
 
@@ -1456,7 +1593,6 @@ public class TaskTracker implements MRCo
         // next heartbeat   
         lastHeartbeat = System.currentTimeMillis();
         
-        
         // Check if the map-event list needs purging
         Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
         if (jobs.size() > 0) {
@@ -1615,7 +1751,6 @@ public class TaskTracker implements MRCo
       localMinSpaceStart = minSpaceStart;
     }
     if (askForNewTask) {
-      checkLocalDirs(fConf.getLocalDirs());
       askForNewTask = enoughFreeSpace(localMinSpaceStart);
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
@@ -1832,7 +1967,7 @@ public class TaskTracker implements MRCo
                                          jobDir.substring(userDir.length()));
     directoryCleanupThread.addToQueue(jobCleanup);
     
-    for (String str : localdirs) {
+    for (String str : localStorage.getGoodLocalDirs()) {
       Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
         new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
       PathDeletionContext ttPrivateJobCleanup =
@@ -1954,7 +2089,7 @@ public class TaskTracker implements MRCo
   
   private long getFreeSpace() throws IOException {
     long biggestSeenSoFar = 0;
-    String[] localDirs = fConf.getLocalDirs();
+    String[] localDirs = localStorage.getGoodLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
       DF df = null;
       if (localDirsDf.containsKey(localDirs[i])) {
@@ -3413,32 +3548,6 @@ public class TaskTracker implements MRCo
   }
     
   /**
-   * Check if the given local directories
-   * (and parent directories, if necessary) can be created.
-   * @param localDirs where the new TaskTracker should keep its local files.
-   * @throws DiskErrorException if all local directories are not writable
-   */
-  private static void checkLocalDirs(String[] localDirs) 
-    throws DiskErrorException {
-    boolean writable = false;
-        
-    if (localDirs != null) {
-      for (int i = 0; i < localDirs.length; i++) {
-        try {
-          DiskChecker.checkDir(new File(localDirs[i]));
-          writable = true;
-        } catch(DiskErrorException e) {
-          LOG.warn("Task Tracker local " + e.getMessage());
-        }
-      }
-    }
-
-    if (!writable)
-      throw new DiskErrorException(
-                                   "all local directories are not writable");
-  }
-    
-  /**
    * Is this task tracker idle?
    * @return has this task tracker finished and cleaned up all of its tasks?
    */

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Apr 22 22:56:58 2011
@@ -35,10 +35,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.TaskDistributedCacheManager.CacheFile;
 import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.JobLocalizer;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
@@ -111,7 +113,8 @@ public class TestTrackerDistributedCache
         taskControllerClass, conf);
 
     // setup permissions for mapred local dir
-    taskController.setup(localDirAllocator);
+    taskController.setup(localDirAllocator,
+        new LocalStorage(conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
@@ -126,7 +129,8 @@ public class TestTrackerDistributedCache
   
   protected void refreshConf(Configuration conf) throws IOException {
     taskController.setConf(conf);
-    taskController.setup(localDirAllocator);
+    taskController.setup(localDirAllocator,
+        new LocalStorage(conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import junit.framework.TestCase;
@@ -75,14 +76,15 @@ public class ClusterWithLinuxTaskControl
         + "/task-controller";
 
     @Override
-    public void setup(LocalDirAllocator allocator) throws IOException {
+    public void setup(LocalDirAllocator allocator, LocalStorage l)
+        throws IOException {
       // get the current ugi and set the task controller group owner
       getConf().set(TT_GROUP, taskTrackerSpecialGroup);
 
       // write configuration file
       configurationFile = createTaskControllerConf(System
           .getProperty(TASKCONTROLLER_PATH), getConf());
-      super.setup(allocator);
+      super.setup(allocator, l);
     }
 
     protected String getTaskControllerExecutablePath() {
@@ -209,8 +211,8 @@ public class ClusterWithLinuxTaskControl
     PrintWriter writer =
         new PrintWriter(new FileOutputStream(configurationFile));
 
-    writer.println(String.format("mapred.local.dir=%s", conf.
-        get(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
+    //writer.println(String.format("mapred.local.dir=%s", conf.
+    //    get(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
 
     writer
         .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJvmManager.java Fri Apr 22 22:56:58 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
@@ -75,8 +76,9 @@ public class TestJvmManager {
     tt.setTaskController((dtc = new DefaultTaskController()));
     Configuration conf = new Configuration();
     dtc.setConf(conf);
-    LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
-    tt.getTaskController().setup(ldirAlloc);
+    LocalDirAllocator ldirAlloc =
+        new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+    tt.getTaskController().setup(ldirAlloc, new LocalStorage(ttConf.getLocalDirs()));
     JobID jobId = new JobID("test", 0);
     jvmManager = new JvmManager(tt);
     tt.setJvmManagerInstance(jvmManager);

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 
 import junit.framework.TestCase;
 
@@ -51,7 +52,10 @@ public class TestLinuxTaskController ext
       // task controller setup should fail validating permissions.
       Throwable th = null;
       try {
-        controller.setup(new LocalDirAllocator("mapred.local.dir"));
+        controller.setup(
+            new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+            new LocalStorage(controller.getConf().getStrings(
+                JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
       } catch (IOException ie) {
         th = ie;
       }
@@ -60,7 +64,9 @@ public class TestLinuxTaskController ext
           + INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
           "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
     } else {
-      controller.setup(new LocalDirAllocator("mapred.local.dir"));
+      controller.setup(new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+          new LocalStorage(controller.getConf().getStrings(
+              JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
     }
 
   }

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Apr 22 22:56:58 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
@@ -187,7 +188,8 @@ public class TestTaskTrackerLocalization
     // setup task controller
     taskController = getTaskController();
     taskController.setConf(trackerFConf);
-    taskController.setup(lDirAlloc);
+    taskController.setup(lDirAlloc,
+                         new LocalStorage(trackerFConf.getLocalDirs()));
     tracker.setTaskController(taskController);
     tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(),localDirs));
   }

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1096075&r1=1096074&r2=1096075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Apr 22 22:56:58 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
 
@@ -65,7 +66,9 @@ public class TestTrackerDistributedCache
     String execPath = path + "/task-controller";
     ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
     taskController.setConf(conf);
-    taskController.setup(new LocalDirAllocator("mapred.local.dir"));
+    taskController.setup(
+        new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY),
+        new LocalStorage(conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)));
   }
 
   @Override