You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/04/23 01:13:23 UTC

svn commit: r1096081 - in /hadoop/common/branches/branch-0.20-security: ./ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Apr 22 23:13:23 2011
New Revision: 1096081

URL: http://svn.apache.org/viewvc?rev=1096081&view=rev
Log:
MAPREDUCE-2415. Distribute the user task logs on to multiple disks.
(Bharath Mundlapudi via omalley)

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
    hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Apr 22 23:13:23 2011
@@ -61,15 +61,18 @@ Release 0.20.204.0 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-2415. Distribute the user task logs on to multiple disks.
+    (Bharath Mundlapudi via omalley)
+
+    MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing 
+    itself. (Ravi Gummadi and Jagane Sundar via omalley)
+
     HDFS-1541. Not marking datanodes dead when namenode in safemode.
     (hairong)
 
     HDFS-1767. Namenode ignores non-initial block report from datanodes
     when in safemode during startup. (Matt Foley via suresh)
 
-    MAPREDUCE-2413. TaskTracker should handle disk failures by reinitializing 
-    itself. (Ravi Gummadi and Jagane Sundar via omalley)
-
 Release 0.20.203.0 - unreleased
 
     HADOOP-7190. Add metrics v1 back for backwards compatibility. (omalley)

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/main.c?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/main.c Fri Apr 22 23:13:23 2011
@@ -192,7 +192,7 @@ int main(int argc, char **argv) {
     break;
   case DELETE_LOG_AS_USER:
     dir_to_be_deleted = argv[optind++];
-    exit_code= delete_log_directory(dir_to_be_deleted);
+    exit_code= delete_log_directory(dir_to_be_deleted, good_local_dirs);
     break;
   case RUN_COMMAND_AS_USER:
     exit_code = run_command_as_user(user_detail->pw_name, argv + optind);

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.c?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.c Fri Apr 22 23:13:23 2011
@@ -28,9 +28,11 @@
 #include <signal.h>
 #include <stdarg.h>
 #include <stdio.h>
+#include <stdbool.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/stat.h>
+#include <sys/time.h>
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"
 
@@ -54,6 +56,8 @@ static const int DEFAULT_MIN_USERID = 10
 
 #define BANNED_USERS_KEY "banned.users"
 
+#define USERLOGS "userlogs"
+
 static const char* DEFAULT_BANNED_USERS[] = {"mapred", "hdfs", "bin", 0};
 
 //struct to store the user details
@@ -355,11 +359,51 @@ int mkdirs(const char* path, mode_t perm
   return 0;
 }
 
+static short get_current_local_dir_count(char **local_dir)
+{
+  char **local_dir_ptr;
+  short count=0;
+
+  for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
+    ++count;
+  }
+  return count;
+}
+
+static char* get_nth_local_dir(char **local_dir, int nth)
+{
+  char **local_dir_ptr;
+  short count=0;
+
+  for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
+    if(count == nth) {
+      return strdup(*local_dir_ptr);
+    }
+    ++count;
+  }
+  fprintf(LOGFILE, "Invalid index %d for %d local directories\n", nth, count);
+  return NULL;
+}
+
+static char*  get_random_local_dir(char **local_dir) {
+  struct timeval tv;
+  short nth;
+  gettimeofday(&tv, NULL);
+  srand ( (long) tv.tv_sec*1000000 + tv.tv_usec );
+  short cnt = get_current_local_dir_count(local_dir);
+  if(cnt == 0) {
+    fprintf(LOGFILE, "No valid local directories\n");
+    return NULL;
+  }
+  nth = rand() % cnt;
+  return get_nth_local_dir(local_dir, nth);
+}
+
 /**
  * Function to prepare the attempt directories for the task JVM.
  * It creates the task work and log directories.
  */
-static int create_attempt_directories(const char* user,
+int create_attempt_directories(const char* user,
     const char * good_local_dirs, const char *job_id, const char *task_id) {
   // create dirs as 0750
   const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
@@ -392,24 +436,67 @@ static int create_attempt_directories(co
       free(task_dir);
     }
   }
-  free_values(local_dir);
 
   // also make the directory for the task logs
   char *job_task_name = malloc(strlen(job_id) + strlen(task_id) + 2);
+  char *real_task_dir = NULL; // target of symlink
+  char *real_job_dir = NULL;  // parent dir of target of symlink
+  char *random_local_dir = NULL;
+  char *link_task_log_dir = NULL; // symlink
   if (job_task_name == NULL) {
     fprintf(LOGFILE, "Malloc of job task name failed\n");
     result = -1;
   } else {
     sprintf(job_task_name, "%s/%s", job_id, task_id);
-    char *log_dir = get_job_log_directory(job_task_name);
-    free(job_task_name);
-    if (log_dir == NULL) {
+    link_task_log_dir = get_job_log_directory(job_task_name);
+    random_local_dir = get_random_local_dir(local_dir);
+    if(random_local_dir == NULL) {
+      result = -1;
+      goto cleanup;
+    }
+    real_job_dir = malloc(strlen(random_local_dir) + strlen(USERLOGS) + 
+                          strlen(job_id) + 3);
+    if (real_job_dir == NULL) {
+      fprintf(LOGFILE, "Malloc of real job directory failed\n");
       result = -1;
-    } else if (mkdirs(log_dir, perms) != 0) {
+      goto cleanup;
+    } 
+    real_task_dir = malloc(strlen(random_local_dir) + strlen(USERLOGS) + 
+                           strlen(job_id) + strlen(task_id) + 4);
+    if (real_task_dir == NULL) {
+      fprintf(LOGFILE, "Malloc of real task directory failed\n");
+      result = -1;
+      goto cleanup;
+    }
+    sprintf(real_job_dir, "%s/userlogs/%s", random_local_dir, job_id);
+    result = create_directory_for_user(real_job_dir);
+    if( result != 0) {
+      result = -1;
+      goto cleanup;
+    }
+    sprintf(real_task_dir, "%s/userlogs/%s/%s",
+            random_local_dir, job_id, task_id);
+    result = mkdirs(real_task_dir, perms); 
+    if( result != 0) {
+      result = -1; 
+      goto cleanup;
+    }
+    result = symlink(real_task_dir, link_task_log_dir);
+    if( result != 0) {
+      fprintf(LOGFILE, "Failed to create symlink %s to %s - %s\n",
+              link_task_log_dir, real_task_dir, strerror(errno));
       result = -1;
     }
-    free(log_dir);
   }
+
+ cleanup:
+  free(random_local_dir);
+  free(job_task_name);
+  free(link_task_log_dir);
+  free(real_job_dir);
+  free(real_task_dir);
+  free_values(local_dir);
+
   return result;
 }
 
@@ -523,7 +610,7 @@ static int change_owner(const char* path
 /**
  * Create a top level directory for the user.
  * It assumes that the parent directory is *not* writable by the user.
- * It creates directories with 02700 permissions owned by the user
+ * It creates directories with 02750 permissions owned by the user
  * and with the group set to the task tracker group.
  * return non-0 on failure
  */
@@ -1036,17 +1123,38 @@ int delete_as_user(const char *user, con
   return ret;
 }
 
-/**
- * delete a given log directory
+/*
+ * delete a given job log directory
+ * This function takes jobid and deletes the related logs.
  */
-int delete_log_directory(const char *subdir) {
-  char* log_subdir = get_job_log_directory(subdir);
+int delete_log_directory(const char *subdir, const char * good_local_dirs) {
+  char* job_log_dir = get_job_log_directory(subdir);
+  
   int ret = -1;
-  if (log_subdir != NULL) {
-    ret = delete_path(log_subdir, strchr(subdir, '/') == NULL);
+  if (job_log_dir == NULL) return ret;
+
+  //delete the job log directory in <hadoop.log.dir>/userlogs/jobid
+  delete_path(job_log_dir, true);
+
+  char **local_dir = get_mapred_local_dirs(good_local_dirs);
+
+  char **local_dir_ptr;
+  for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
+     char *mapred_local_log_dir = concatenate("%s/userlogs/%s", 
+				      "mapred local job log dir", 
+			      	      2, *local_dir_ptr, subdir);
+     if (mapred_local_log_dir != NULL) {
+        //delete the job log directory in <mapred.local.dir>/userlogs/jobid
+        delete_path(mapred_local_log_dir, true);
+	free(mapred_local_log_dir);
+     }
+     else
+        fprintf(LOGFILE, "Failed to delete mapred local log dir for jobid %s\n",
+            subdir);
   }
-  free(log_subdir);
-  return ret;
+  free(job_log_dir);
+  free_values(local_dir);
+  return 0;
 }
 
 /**

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/impl/task-controller.h?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/impl/task-controller.h Fri Apr 22 23:13:23 2011
@@ -71,7 +71,7 @@ int check_taskcontroller_permissions(cha
 /**
  * delete a given log directory as a user
  */
-int delete_log_directory(const char *log_dir);
+int delete_log_directory(const char *log_dir, const char * good_local_dirs);
 
 // initialize the job directory
 int initialize_job(const char *user, const char * good_local_dirs, const char *jobid,
@@ -153,3 +153,9 @@ int initialize_user(const char *user, co
 int create_directory_for_user(const char* path);
 
 int change_user(uid_t user, gid_t group);
+
+/**
+ * Create task attempt related directories as user.
+ */
+int create_attempt_directories(const char* user,
+	const char * good_local_dirs, const char *job_id, const char *task_id);

Modified: hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/c%2B%2B/task-controller/test/test-task-controller.c?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security/src/c++/task-controller/test/test-task-controller.c Fri Apr 22 23:13:23 2011
@@ -24,9 +24,11 @@
 #include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <stdbool.h>
 #include <string.h>
 #include <sys/stat.h>
 #include <sys/wait.h>
+#include <limits.h>
 
 #define TEST_ROOT "/tmp/test-task-controller"
 #define DONT_TOUCH_FILE "dont-touch-me"
@@ -185,6 +187,78 @@ void test_get_task_log_dir() {
   free(logdir);
 }
 
+void create_userlogs_dir() {
+  char** tt_roots = get_values("mapred.local.dir");
+  char** tt_root;
+  for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) {
+    char buffer[100000];
+    sprintf(buffer, "%s/userlogs", *tt_root);
+    if (mkdir(buffer, 0755) != 0) {
+      printf("FAIL: Can't create directory %s - %s\n", buffer,
+             strerror(errno));
+      exit(1);
+    }
+  }
+  free_values(tt_roots);
+}
+
+void test_create_log_directory() {
+  printf("\nTesting test_create_log_directory\n");
+  create_userlogs_dir();
+  char *job_log_dir = get_job_log_directory("job_7");
+  if (job_log_dir == NULL) {
+    exit(1);
+  }
+  if (create_directory_for_user(job_log_dir) != 0) {
+    exit(1);
+  }
+  free(job_log_dir);
+  char* good_local_dirs = get_value("mapred.local.dir");
+  if (good_local_dirs == NULL) {
+    fprintf(LOGFILE, "Mapred local directories could not be obtained.\n");
+    exit(1);
+  }
+  create_attempt_directories(username, good_local_dirs, "job_7", "task_1");
+
+  //check if symlink got created
+  struct stat file;
+  int status;
+  char actualpath [PATH_MAX+1];
+  char *res;
+  char *filepath = TEST_ROOT "/logs/userlogs/job_7/task_1";
+
+  status = lstat(filepath, &file);
+  if (!S_ISLNK(file.st_mode)) {
+    fprintf(LOGFILE, "Symlink creation failed\n");
+    exit(1);
+  }
+
+  //Check if symlink path exists
+  res = realpath(filepath, actualpath);
+  if(!res) {
+    fprintf(LOGFILE, "Failed to get target for the symlink\n");
+    exit(1);
+  }
+
+  char local_job_dir[PATH_MAX+1];
+  int i;
+  bool found = false;
+  for(i=1; i<5; i++) {
+     sprintf(local_job_dir, TEST_ROOT "/local-%d/userlogs/job_7/task_1", i);
+     if (strcmp(local_job_dir, actualpath) == 0) {
+       found = true;
+       break;
+     }
+  }
+  
+  if(!found) {
+    printf("FAIL: symlink path and target path mismatch\n");
+    exit(1);
+  }
+
+  free(good_local_dirs);
+}
+
 void test_check_user() {
   printf("\nTesting test_check_user\n");
   struct passwd *user = check_user(username);
@@ -220,8 +294,9 @@ void test_check_configuration_permission
 }
 
 void test_delete_task() {
-  if (initialize_user(username)) {
-    printf("FAIL: failed to initialized user %s\n", username);
+  char* local_dirs = get_value("mapred.local.dir");
+  if (initialize_user(username, local_dirs)) {
+    printf("FAIL: failed to initialize user %s\n", username);
     exit(1);
   }
   char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1");
@@ -254,7 +329,7 @@ void test_delete_task() {
   run(buffer);
 
   // delete task directory
-  int ret = delete_as_user(username, "jobcache/job_1/task_1");
+  int ret = delete_as_user(username, local_dirs, "jobcache/job_1/task_1");
   if (ret != 0) {
     printf("FAIL: return code from delete_as_user is %d\n", ret);
     exit(1);
@@ -282,9 +357,11 @@ void test_delete_task() {
   free(job_dir);
   free(task_dir);
   free(dont_touch);
+  free(local_dirs);
 }
 
 void test_delete_job() {
+  char* local_dirs = get_value("mapred.local.dir");
   char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2");
   char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, 
                                        DONT_TOUCH_FILE);
@@ -315,7 +392,7 @@ void test_delete_job() {
   run(buffer);
 
   // delete task directory
-  int ret = delete_as_user(username, "jobcache/job_2");
+  int ret = delete_as_user(username, local_dirs, "jobcache/job_2");
   if (ret != 0) {
     printf("FAIL: return code from delete_as_user is %d\n", ret);
     exit(1);
@@ -339,11 +416,13 @@ void test_delete_job() {
   free(job_dir);
   free(task_dir);
   free(dont_touch);
+  free(local_dirs);
 }
 
 
 void test_delete_user() {
   printf("\nTesting delete_user\n");
+  char* local_dirs = get_value("mapred.local.dir");
   char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3");
   if (mkdirs(job_dir, 0700) != 0) {
     exit(1);
@@ -354,7 +433,7 @@ void test_delete_user() {
     printf("FAIL: directory missing before test\n");
     exit(1);
   }
-  if (delete_as_user(username, "") != 0) {
+  if (delete_as_user(username, local_dirs, "") != 0) {
     exit(1);
   }
   if (access(buffer, R_OK) == 0) {
@@ -366,10 +445,12 @@ void test_delete_user() {
     exit(1);
   }
   free(job_dir);
+  free(local_dirs);
 }
 
 void test_delete_log_directory() {
   printf("\nTesting delete_log_directory\n");
+  char* local_dirs = get_value("mapred.local.dir");
   char *job_log_dir = get_job_log_directory("job_1");
   if (job_log_dir == NULL) {
     exit(1);
@@ -389,7 +470,7 @@ void test_delete_log_directory() {
     printf("FAIL: can't access task directory - %s\n", strerror(errno));
     exit(1);
   }
-  if (delete_log_directory("job_1/task_2") != 0) {
+  if (delete_log_directory("job_1/task_2", local_dirs) != 0) {
     printf("FAIL: can't delete task directory\n");
     exit(1);
   }
@@ -401,7 +482,7 @@ void test_delete_log_directory() {
     printf("FAIL: job directory not deleted - %s\n", strerror(errno));
     exit(1);
   }
-  if (delete_log_directory("job_1") != 0) {
+  if (delete_log_directory("job_1", local_dirs) != 0) {
     printf("FAIL: can't delete task directory\n");
     exit(1);
   }
@@ -409,7 +490,25 @@ void test_delete_log_directory() {
     printf("FAIL: job directory not deleted\n");
     exit(1);
   }
+  if (delete_log_directory("job_7", local_dirs) != 0) {
+    printf("FAIL: can't delete job directory\n");
+    exit(1);
+  }
+  if (access(TEST_ROOT "/logs/userlogs/job_7", R_OK) == 0) {
+    printf("FAIL: job log directory not deleted\n");
+    exit(1);
+  }
+  char local_job_dir[PATH_MAX+1];
+  int i;
+  for(i=1; i<5; i++) {
+     sprintf(local_job_dir, TEST_ROOT "/local-%d/userlogs/job_7", i);
+     if (access(local_job_dir, R_OK) == 0) {
+       printf("FAIL: job log directory in mapred local not deleted\n");
+       exit(1);
+     }
+  }
   free(task_log_dir);
+  free(local_dirs);
 }
 
 void run_test_in_child(const char* test_name, void (*func)()) {
@@ -558,7 +657,8 @@ void test_init_job() {
     exit(1);
   } else if (child == 0) {
     char *final_pgm[] = {"touch", "my-touch-file", 0};
-    if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt", 
+    char* local_dirs = get_value("mapred.local.dir");
+    if (initialize_job(username, local_dirs, "job_4", TEST_ROOT "/creds.txt", 
                        TEST_ROOT "/job.xml", final_pgm) != 0) {
       printf("FAIL: failed in child\n");
       exit(42);
@@ -631,13 +731,14 @@ void test_run_task() {
   fflush(stderr);
   char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1", 
 					      username, "job_4", "task_1");
+  char* local_dirs = get_value("mapred.local.dir");
   pid_t child = fork();
   if (child == -1) {
     printf("FAIL: failed to fork process for init_job - %s\n", 
 	   strerror(errno));
     exit(1);
   } else if (child == 0) {
-    if (run_task_as_user(username, "", "job_4", "task_1", 
+    if (run_task_as_user(username, local_dirs, "job_4", "task_1", 
                          task_dir, script_name) != 0) {
       printf("FAIL: failed in child\n");
       exit(42);
@@ -736,6 +837,8 @@ int main(int argc, char **argv) {
 
   test_check_user();
 
+  test_create_log_directory();
+
   test_delete_log_directory();
 
   // the tests that change user need to be run in a subshell, so that

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Apr 22 23:13:23 2011
@@ -65,6 +65,17 @@ public class DefaultTaskController exten
     }
   }
 
+  @Override
+  public void createLogDir(TaskAttemptID taskID, 
+		  			boolean isCleanup) throws IOException {
+	boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup, 
+	    		 		localStorage.getGoodLocalDirs());
+	if (!b) {
+	    LOG.warn("Creation of attempt log dir for " + taskID
+	                 + " failed. Ignoring");
+	}
+  }
+  
   /**
    * Create all of the directories for the task and launches the child jvm.
    * @param user the user name
@@ -80,9 +91,8 @@ public class DefaultTaskController exten
                                   File currentWorkDirectory,
                                   String stdout,
                                   String stderr) throws IOException {
-    
     ShellCommandExecutor shExec = null;
-    try {
+    try {    	            
       FileSystem localFs = FileSystem.getLocal(getConf());
       
       //create the attempt dirs
@@ -232,7 +242,24 @@ public class DefaultTaskController exten
   public void deleteLogAsUser(String user, 
                               String subDir) throws IOException {
     Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
-    fs.delete(dir, true);
+    //Delete the subDir in <hadoop.log.dir>/userlogs
+    File subDirPath = new File(dir.toString());
+    FileUtil.fullyDelete( subDirPath );
+    
+    //Delete the subDir in all good <mapred.local.dirs>/userlogs 
+    String [] localDirs = localStorage.getGoodLocalDirs();
+    for(String localdir : localDirs) {
+    	String dirPath = localdir + File.separatorChar + 
+    					TaskLog.USERLOGS_DIR_NAME + File.separatorChar +
+    					subDir;
+    	try {
+    		FileUtil.fullyDelete( new File(dirPath) );
+        } catch(Exception e){
+        	//Skip bad dir for later deletion
+            LOG.warn("Could not delete dir: " + dirPath + 
+                " , Reason : " + e.getMessage());
+        }
+    }
   }
   
   @Override

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Apr 22 23:13:23 2011
@@ -71,6 +71,10 @@ public abstract class TaskController imp
     return conf;
   }
 
+  public LocalStorage getLocalStorage() {
+	  return localStorage;
+  }
+  
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
@@ -143,6 +147,17 @@ public abstract class TaskController imp
    */
   public abstract void deleteAsUser(String user, 
                                     String subDir) throws IOException;
+
+  /**
+   * Creates task log dir
+   * @param taskID ID of the task
+   * @param isCleanup If the task is cleanup task or not
+   * @throws IOException
+   */
+  public void createLogDir(TaskAttemptID taskID, 
+			boolean isCleanup) throws IOException {
+	  
+  }
   
   /**
    * Delete the user's files under the userlogs directory.

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Apr 22 23:13:23 2011
@@ -33,6 +33,7 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,8 +44,10 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
@@ -75,7 +78,103 @@ public class TaskLog {
       LOG_DIR.mkdirs();
     }
   }
+  
+  static AtomicInteger rotor = new AtomicInteger(0);
+
+  /**
+   * Create log directory for the given attempt. This involves creating the
+   * following and setting proper permissions for the new directories
+   * <br>{hadoop.log.dir}/userlogs/<jobid>
+   * <br>{hadoop.log.dir}/userlogs/<jobid>/<attempt-id-as-symlink>
+   * <br>{one of the mapred-local-dirs}/userlogs/<jobid>
+   * <br>{one of the mapred-local-dirs}/userlogs/<jobid>/<attempt-id>
+   *
+   * @param taskID attempt-id for which log dir is to be created
+   * @param isCleanup Is this attempt a cleanup attempt ?
+   * @param localDirs mapred local directories
+   * @return true if attempt log directory creation is succeeded
+   * @throws IOException
+   */
+  public static boolean createTaskAttemptLogDir(TaskAttemptID taskID,
+      boolean isCleanup, String[] localDirs) throws IOException{
+    String cleanupSuffix = isCleanup ? ".cleanup" : "";
+    String strAttemptLogDir = getTaskAttemptLogDir(taskID, 
+        cleanupSuffix, localDirs);
+    File attemptLogDir = new File(strAttemptLogDir);
+    boolean isSucceeded = attemptLogDir.mkdirs();
+    if(isSucceeded) {
+      String strLinkAttemptLogDir = getJobDir(
+          taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
+          taskID.toString() + cleanupSuffix;
+      if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
+        LOG.warn("Creation of symlink to attempt log dir failed.");
+        isSucceeded = false;
+      }
+
+      File linkAttemptLogDir = new File(strLinkAttemptLogDir);
+      // Set permissions for job dir in userlogs
+      if (!Localizer.PermissionsHandler.setPermissions(
+          linkAttemptLogDir.getParentFile(),
+          Localizer.PermissionsHandler.sevenZeroZero)) {
+        LOG.warn("Setting permissions to "
+                 + linkAttemptLogDir.getParentFile() + " failed.");
+        isSucceeded = false;
+      }
+      //Set permissions for target attempt log dir 
+      if (!Localizer.PermissionsHandler.setPermissions(attemptLogDir,
+          Localizer.PermissionsHandler.sevenZeroZero)) {
+        LOG.warn("Setting permissions to the real attempt log dir "
+                 + attemptLogDir + " failed.");
+        isSucceeded = false;
+      }
+      //Set permissions for target job log dir
+      if (!Localizer.PermissionsHandler.setPermissions(
+          attemptLogDir.getParentFile(),
+          Localizer.PermissionsHandler.sevenZeroZero)) {
+        LOG.warn("Setting permissions to the real job log dir "
+                 + attemptLogDir.getParentFile() + " failed.");
+        isSucceeded = false;
+      }
+    }
+    return isSucceeded;
+  }
+
+  /**
+   * Get one of the mapred local directory in a round-robin-way.
+   * @param localDirs mapred local directories
+   * @return the next chosen mapred local directory
+   * @throws IOException
+   */
+  private static String getNextLocalDir(String[] localDirs) throws IOException{
+    if(localDirs.length == 0) {
+      throw new IOException ("Not enough mapred.local.dirs ("
+                             + localDirs.length + ")");
+    }
+    return localDirs[Math.abs(rotor.getAndIncrement()) % localDirs.length];  
+  }
 
+  /**
+   * Get attempt log directory path for the given attempt-id under randomly
+   * selected mapred local directory.
+   * @param taskID attempt-id for which log dir path is needed
+   * @param cleanupSuffix ".cleanup" if this attempt is a cleanup attempt 
+   * @param localDirs mapred local directories
+   * @return target task attempt log directory
+   * @throws IOException
+   */
+  public static String getTaskAttemptLogDir(TaskAttemptID taskID, 
+      String cleanupSuffix, String[] localDirs) throws IOException {
+    StringBuilder taskLogDirLocation = new StringBuilder();
+    taskLogDirLocation.append(getNextLocalDir(localDirs));
+    taskLogDirLocation.append(File.separatorChar);
+    taskLogDirLocation.append(USERLOGS_DIR_NAME);
+    taskLogDirLocation.append(File.separatorChar);
+    taskLogDirLocation.append(taskID.getJobID().toString());
+    taskLogDirLocation.append(File.separatorChar);
+    taskLogDirLocation.append(taskID.toString()+cleanupSuffix);
+    return taskLogDirLocation.toString();
+  }
+  
   public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
       LogName filter) {
     return new File(getAttemptDir(taskid, isCleanup), filter.toString());

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Apr 22 23:13:23 2011
@@ -313,15 +313,7 @@ abstract class TaskRunner extends Thread
         TaskLog.LogName.STDOUT);
     logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup,
         TaskLog.LogName.STDERR);
-    File logDir = logFiles[0].getParentFile();
-    boolean b = logDir.mkdirs();
-    if (!b) {
-      LOG.warn("mkdirs failed. Ignoring");
-    } else {
-      Localizer.PermissionsHandler.setPermissions(logDir,
-          Localizer.PermissionsHandler.sevenZeroZero);
-    }
-
+    getTracker().getTaskController().createLogDir(taskid, isCleanup);
     return logFiles;
   }
 

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Apr 22 23:13:23 2011
@@ -745,7 +745,13 @@ public class TaskTracker implements MRCo
     for (String s : localStorage.getGoodLocalDirs()) {
       localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
     }
-
+    // Create userlogs directory under all good mapred-local-dirs
+    for (String s : localStorage.getGoodLocalDirs()) {
+      Path userLogsDir = new Path(s, TaskLog.USERLOGS_DIR_NAME);
+      if (!localFs.exists(userLogsDir)) {
+        localFs.mkdirs(userLogsDir, pub);
+      }
+    }
     // Clear out state tables
     this.tasks.clear();
     this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
@@ -905,7 +911,9 @@ public class TaskTracker implements MRCo
    * startup, to remove any leftovers from previous run.
    */
   public void cleanupStorage() throws IOException {
-    this.fConf.deleteLocalFiles();
+    this.fConf.deleteLocalFiles(SUBDIR);
+    this.fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+    this.fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
   }
 
   // Object on wait which MapEventsFetcherThread is going to wait.
@@ -1396,6 +1404,14 @@ public class TaskTracker implements MRCo
     fConf = conf;
   }
 
+  void setLocalStorage(LocalStorage in) {
+	localStorage = in;  
+  }
+	  
+  void setLocalDirAllocator(LocalDirAllocator in) {
+	localDirAllocator = in;  
+  }
+  
   /**
    * Start with the local machine name, and the default JobTracker
    */

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java Fri Apr 22 23:13:23 2011
@@ -123,36 +123,52 @@ public class UserLogCleaner extends Thre
   }
 
   /**
-   * Clears all the logs in userlog directory.
+   * Adds the job log directories for deletion with default retain hours. 
+   * Deletes all other directories, if any. 
    * 
-   * Adds the job directories for deletion with default retain hours. Deletes
-   * all other directories, if any. This is usually called on reinit/restart of
-   * the TaskTracker
+   * @param loc location of log directory
+   * @param conf 
+   * @throws IOException
+   */
+  public void addOldUserLogsForDeletion(File loc, Configuration conf)  
+  			throws IOException  {
+    if (loc.exists()) {
+        long now = clock.getTime();
+        for(String logDir: loc.list()) {
+          // add all the log dirs to taskLogsMnonitor.
+          JobID jobid = null;
+          try {
+            jobid = JobID.forName(logDir);
+          } catch (IllegalArgumentException ie) {
+            deleteLogPath(logDir);
+            continue;
+          }
+          // add the job log directory for deletion with 
+          // default retain hours, if it is not already added
+          if (!completedJobs.containsKey(jobid)) {
+            JobCompletedEvent jce = 
+              new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
+            userLogManager.addLogEvent(jce);
+          }
+        }
+      }
+  }
+  
+  /**
+   * Clears all the logs in userlogs directory. This is usually called on 
+   * reinit/restart of the TaskTracker.
    * 
    * @param conf
    * @throws IOException
    */
   public void clearOldUserLogs(Configuration conf) throws IOException {
     File userLogDir = TaskLog.getUserLogDir();
-    if (userLogDir.exists()) {
-      long now = clock.getTime();
-      for(String logDir: userLogDir.list()) {
-        // add all the log dirs to taskLogsMnonitor.
-        JobID jobid = null;
-        try {
-          jobid = JobID.forName(logDir);
-        } catch (IllegalArgumentException ie) {
-          deleteLogPath(logDir);
-          continue;
-        }
-        // add the job log directory for deletion with default retain hours,
-        // if it is not already added
-        if (!completedJobs.containsKey(jobid)) {
-          JobCompletedEvent jce = 
-            new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
-          userLogManager.addLogEvent(jce);
-        }
-      }
+    addOldUserLogsForDeletion(userLogDir, conf);
+    String[] localDirs = conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+    for(String localDir : localDirs) {
+    	File mapredLocalUserLogDir = new File(localDir + 
+    			File.separatorChar + TaskLog.USERLOGS_DIR_NAME);
+    	addOldUserLogsForDeletion(mapredLocalUserLogDir, conf);
     }
   }
 
@@ -196,6 +212,48 @@ public class UserLogCleaner extends Thre
   }
 
   /**
+   * Gets the user for the log path.
+   * 
+   * @param logPath
+   * @throws IOException
+   */
+  private String getLogUser(String logPath) throws IOException{
+	//Get user from <hadoop.log.dir>/userlogs/jobid path
+	String logRoot = TaskLog.getUserLogDir().toString();
+	String user = null;
+	try{
+		user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+	}catch(Exception e){
+		//Ignore this exception since this path might have been deleted.
+	}
+	
+	//If we found the user for this logPath, then return this user
+	if(user != null) return user; 
+
+	//If <hadoop.log.dir>/userlogs/jobid not found, then get user from 
+	//any one of existing <mapred.local.dir>/userlogs/jobid path(s)
+	String[] localDirs = 
+	   userLogManager.getTaskController().getLocalStorage().getGoodLocalDirs();
+	for(String localDir : localDirs) {
+		try{
+		   logRoot = localDir + File.separator + TaskLog.USERLOGS_DIR_NAME;
+		   user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+		   //If we found the user for this logPath, then break this loop
+		   if(user != null) break; 
+			
+		}catch(Exception e){
+			//Ignore this exception since this path might have been deleted.
+		}
+	}
+	
+	if(user == null) {
+		throw new IOException("Userlog path not found for " + logPath);
+	}
+	
+	return user;
+  }
+  
+  /**
    * Deletes the log path.
    * 
    * This path will be removed through {@link CleanupQueue}
@@ -205,8 +263,7 @@ public class UserLogCleaner extends Thre
    */
   private void deleteLogPath(String logPath) throws IOException {
     LOG.info("Deleting user log path " + logPath);
-    String logRoot = TaskLog.getUserLogDir().toString();
-    String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+    String user = getLogUser(logPath);
     TaskController controller = userLogManager.getTaskController();
     PathDeletionContext item = 
       new TaskController.DeletionContext(controller, true, user, logPath);

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1096081&r1=1096080&r2=1096081&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestUserLogCleanup.java Fri Apr 22 23:13:23 2011
@@ -23,11 +23,14 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.mapred.TaskTracker.LocalStorage;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.junit.Assert.*;
 
@@ -48,15 +51,18 @@ public class TestUserLogCleanup {
   private JobID jobid4 = new JobID(jtid, 4);
   private File foo = new File(TaskLog.getUserLogDir(), "foo");
   private File bar = new File(TaskLog.getUserLogDir(), "bar");
+  private static String TEST_ROOT_DIR = 
+	  				System.getProperty("test.build.data", "/tmp");
 
-  public TestUserLogCleanup() throws IOException {
-    Configuration conf = new Configuration();
+  public TestUserLogCleanup() throws IOException, InterruptedException {
+    JobConf conf= new JobConf();
     startTT(conf);
   }
 
   @After
   public void tearDown() throws IOException {
     FileUtil.fullyDelete(TaskLog.getUserLogDir());
+    FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
   }
 
   private File localizeJob(JobID jobid) throws IOException {
@@ -77,14 +83,26 @@ public class TestUserLogCleanup {
     userLogManager.addLogEvent(jce);
   }
 
-  private void startTT(Configuration conf) throws IOException {
+  private void startTT(JobConf conf) throws IOException, InterruptedException {
     myClock = new FakeClock(); // clock is reset.
+    String localdirs = TEST_ROOT_DIR + "/userlogs/local/0," + 
+    					TEST_ROOT_DIR + "/userlogs/local/1";
+    conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localdirs);
     tt = new TaskTracker();
     tt.setConf(new JobConf(conf));
+    LocalDirAllocator localDirAllocator = 
+    					new LocalDirAllocator("mapred.local.dir");
+    tt.setLocalDirAllocator(localDirAllocator);
+    LocalStorage localStorage = new LocalStorage(conf.getLocalDirs());
+    localStorage.checkLocalDirs();
+    tt.setLocalStorage(localStorage);
     localizer = new Localizer(FileSystem.get(conf), conf
         .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
     tt.setLocalizer(localizer);
     userLogManager = new UtilsForTests.InLineUserLogManager(conf);
+    TaskController taskController = userLogManager.getTaskController();
+    taskController.setup(localDirAllocator, localStorage);
+    tt.setTaskController(taskController);
     userLogCleaner = userLogManager.getUserLogCleaner();
     userLogCleaner.setClock(myClock);
     tt.setUserLogManager(userLogManager);
@@ -92,13 +110,13 @@ public class TestUserLogCleanup {
   }
 
   private void ttReinited() throws IOException {
-    Configuration conf = new Configuration();
+    JobConf conf=new JobConf();
     conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
     userLogManager.clearOldUserLogs(conf);
   }
 
-  private void ttRestarted() throws IOException {
-    Configuration conf = new Configuration();
+  private void ttRestarted() throws IOException, InterruptedException {
+    JobConf conf=new JobConf();
     conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
     startTT(conf);
   }
@@ -228,9 +246,11 @@ public class TestUserLogCleanup {
    * restart.
    * 
    * @throws IOException
+ * @throws InterruptedException 
    */
   @Test
-  public void testUserLogCleanupAfterRestart() throws IOException {
+  public void testUserLogCleanupAfterRestart() 
+  					throws IOException, InterruptedException {
     File jobUserlog1 = localizeJob(jobid1);
     File jobUserlog2 = localizeJob(jobid2);
     File jobUserlog3 = localizeJob(jobid3);