You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/03/09 11:56:45 UTC

svn commit: r920793 - in /hadoop/mapreduce/trunk: ./ src/c++/task-controller/ src/c++/task-controller/tests/ src/java/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src...

Author: vinodkv
Date: Tue Mar  9 10:56:44 2010
New Revision: 920793

URL: http://svn.apache.org/viewvc?rev=920793&view=rev
Log:
MAPREDUCE-927. Cleanup of task-logs should happen in TaskTracker instead of the Child. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar  9 10:56:44 2010
@@ -430,6 +430,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1573. TestStreamingAsDifferentUser fails if run as tt_user.
     (Ravi Gummadi via vinodkv)
 
+    MAPREDUCE-927. Cleanup of task-logs should happen in TaskTracker instead
+    of the Child. (Amareshwari Sriramadasu via vinodkv)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Tue Mar  9 10:56:44 2010
@@ -213,9 +213,17 @@ char *get_task_dir_path(const char *tt_r
 /**
  * Get the log directory for the given attempt.
  */
-char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
-  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
-      attempt_id);
+char *get_task_log_dir(const char *log_dir, const char *job_id, 
+    const char *attempt_id) {
+  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 3, log_dir,
+      job_id, attempt_id);
+}
+
+/**
+ * Get the log directory for the given job.
+ */
+char *get_job_log_dir(const char *log_dir, const char *job_id) {
+  return concatenate(JOB_LOG_DIR_PATTERN, "job_log_dir", 2, log_dir, job_id);
 }
 
 /**
@@ -499,15 +507,61 @@ int prepare_attempt_directories(const ch
 }
 
 /**
+ * Function to prepare the job log dir for the child. It gives the user
+ * ownership of the job's log-dir to the user and group ownership to the
+ * user running tasktracker.
+ *     *  sudo chown user:mapred log-dir/userlogs/$jobid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
+ *     *  sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ */
+int prepare_job_logs(const char *log_dir, const char *job_id,
+    mode_t permissions) {
+
+  char *job_log_dir = get_job_log_dir(log_dir, job_id);
+  if (job_log_dir == NULL) {
+    fprintf(LOGFILE, "Couldn't get job log directory %s.\n", job_log_dir);
+    return -1;
+  }
+
+  struct stat filestat;
+  if (stat(job_log_dir, &filestat) != 0) {
+    if (errno == ENOENT) {
+#ifdef DEBUG
+      fprintf(LOGFILE, "job_log_dir %s doesn't exist. Not doing anything.\n",
+          job_log_dir);
+#endif
+      free(job_log_dir);
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the job log dir %s\n", job_log_dir);
+      free(job_log_dir);
+      return -1;
+    }
+  }
+
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+  if (secure_path(job_log_dir, user_detail->pw_uid, tasktracker_gid,
+      permissions, S_ISGID | permissions, 1) != 0) {
+    fprintf(LOGFILE, "Failed to secure the log_dir %s\n", job_log_dir);
+    free(job_log_dir);
+    return -1;
+  }
+  free(job_log_dir);
+  return 0;
+}
+
+/**
  * Function to prepare the task logs for the child. It gives the user
  * ownership of the attempt's log-dir to the user and group ownership to the
  * user running tasktracker.
- *     *  sudo chown user:mapred log-dir/userlogs/$attemptid
- *     *  sudo chmod -R 2770 log-dir/userlogs/$attemptid
+ *     *  sudo chown user:mapred log-dir/userlogs/$jobid/$attemptid
+ *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid/$attemptid
  */
-int prepare_task_logs(const char *log_dir, const char *task_id) {
+int prepare_task_logs(const char *log_dir, const char *job_id, 
+    const char *task_id) {
 
-  char *task_log_dir = get_task_log_dir(log_dir, task_id);
+  char *task_log_dir = get_task_log_dir(log_dir, job_id, task_id);
   if (task_log_dir == NULL) {
     fprintf(LOGFILE, "Couldn't get task_log directory %s.\n", task_log_dir);
     return -1;
@@ -525,10 +579,12 @@ int prepare_task_logs(const char *log_di
       fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
           task_log_dir);
 #endif
+      free(task_log_dir);
       return 0;
     } else {
       // stat failed because of something else!
       fprintf(LOGFILE, "Failed to stat the task_log_dir %s\n", task_log_dir);
+      free(task_log_dir);
       return -1;
     }
   }
@@ -538,8 +594,10 @@ int prepare_task_logs(const char *log_di
       S_IRWXU | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG, 1) != 0) {
     // setgid on dirs but not files, 770. As of now, there are no files though
     fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+    free(task_log_dir);
     return -1;
   }
+  free(task_log_dir);
   return 0;
 }
 
@@ -668,10 +726,13 @@ int initialize_user(const char *user) {
  * Function to prepare the job directories for the task JVM.
  * We do the following:
  *     *  sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid
+ *     *  sudo chown user:mapred -R logs/userlogs/$jobid
  *     *  if user is not $tt_user,
  *     *    sudo chmod 2570 -R taskTracker/$user/jobcache/$jobid
+ *     *    sudo chmod 2570 -R logs/userlogs/$jobid
  *     *  else // user is tt_user
  *     *    sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid
+ *     *    sudo chmod 2770 -R logs/userlogs/$jobid
  *     *
  *     *  For any user, sudo chmod 2770 taskTracker/$user/jobcache/$jobid/work
  */
@@ -783,11 +844,32 @@ int initialize_job(const char *jobid, co
   }
   free(local_dir);
   free(full_local_dir_str);
-  cleanup();
+  int exit_code = 0;
   if (failed) {
-    return INITIALIZE_JOB_FAILED;
+    exit_code = INITIALIZE_JOB_FAILED;
+    goto cleanup;
   }
-  return 0;
+
+  char *log_dir = (char *) get_value(TT_LOG_DIR_KEY);
+  if (log_dir == NULL) {
+    fprintf(LOGFILE, "Log directory is not configured.\n");
+    exit_code = INVALID_TT_LOG_DIR;
+    goto cleanup;
+  }
+
+  if (prepare_job_logs(log_dir, jobid, permissions) != 0) {
+    fprintf(LOGFILE, "Couldn't prepare job logs directory %s for %s.\n",
+        log_dir, jobid);
+    exit_code = PREPARE_JOB_LOGS_FAILED;
+  }
+
+  cleanup:
+  // free configurations
+  cleanup();
+  if (log_dir != NULL) {
+    free(log_dir);
+  }
+  return exit_code;
 }
 
 /**
@@ -891,7 +973,7 @@ int initialize_task(const char *jobid, c
     goto cleanup;
   }
 
-  if (prepare_task_logs(log_dir, taskid) != 0) {
+  if (prepare_task_logs(log_dir, jobid, taskid) != 0) {
     fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
         log_dir, taskid);
     exit_code = PREPARE_TASK_LOGS_FAILED;

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Tue Mar  9 10:56:44 2010
@@ -73,7 +73,8 @@ enum errorcodes {
   UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
   INVALID_CONF_DIR, //22
   UNABLE_TO_BUILD_PATH, //23
-  INVALID_TASKCONTROLLER_PERMISSIONS //24
+  INVALID_TASKCONTROLLER_PERMISSIONS, //24
+  PREPARE_JOB_LOGS_FAILED, //25
 };
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -86,7 +87,9 @@ enum errorcodes {
 
 #define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
 
-#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+#define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c Tue Mar  9 10:56:44 2010
@@ -171,13 +171,26 @@ void test_get_task_launcher_file() {
   assert(ret == 0);
 }
 
+void test_get_job_log_dir() {
+  char *logdir = (char *) get_job_log_dir("/tmp/testing",
+    "job_200906101234_0001");
+  printf("logdir obtained is %s\n", logdir);
+  int ret = 0;
+  if (strcmp(logdir, "/tmp/testing/userlogs/job_200906101234_0001") != 0) {
+    ret = -1;
+  }
+  free(logdir);
+  assert(ret == 0);
+}
+
 void test_get_task_log_dir() {
   char *logdir = (char *) get_task_log_dir("/tmp/testing",
-      "attempt_200906112028_0001_m_000000_0");
+    "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
   printf("logdir obtained is %s\n", logdir);
   int ret = 0;
   if (strcmp(logdir,
-      "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+      "/tmp/testing/userlogs/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      != 0) {
     ret = -1;
   }
   free(logdir);
@@ -203,6 +216,9 @@ int main(int argc, char **argv) {
   printf("\nTesting get_task_launcher_file()\n");
   test_get_task_launcher_file();
 
+  printf("\nTesting get_job_log_dir()\n");
+  test_get_job_log_dir();
+
   printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
 

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Tue Mar  9 10:56:44 2010
@@ -672,10 +672,10 @@
 </property>
 
 <property>
-  <name>mapreduce.task.userlog.retain.hours</name>
+  <name>mapreduce.job.userlog.retain.hours</name>
   <value>24</value>
   <description>The maximum time, in hours, for which the user-logs are to be 
-          retained.
+               retained after the job completion.
   </description>
 </property>
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Tue Mar  9 10:56:44 2010
@@ -193,7 +193,6 @@ class Child {
 
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);
-        TaskLog.cleanup(job.getInt(JobContext.TASK_LOG_RETAIN_HOURS, 24));
 
         task.setConf(job);
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Mar  9 10:56:44 2010
@@ -22,7 +22,6 @@ import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -34,11 +33,13 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.log4j.Appender;
@@ -50,6 +51,7 @@ import org.apache.log4j.Logger;
  * This class uses the system property <code>hadoop.log.dir</code>.
  * 
  */
+@InterfaceAudience.Private
 public class TaskLog {
   private static final Log LOG =
     LogFactory.getLog(TaskLog.class);
@@ -71,7 +73,7 @@ public class TaskLog {
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(getBaseDir(taskid.toString()), filter.toString());
+    return new File(getAttemptDir(taskid.toString()), filter.toString());
   }
   public static File getRealTaskLogFileLocation(TaskAttemptID taskid, 
       LogName filter) {
@@ -82,7 +84,7 @@ public class TaskLog {
       LOG.error("getTaskLogFileDetail threw an exception " + ie);
       return null;
     }
-    return new File(getBaseDir(l.location), filter.toString());
+    return new File(getAttemptDir(l.location), filter.toString());
   }
   private static class LogFileDetail {
     final static String LOCATION = "LOG_DIR:";
@@ -118,7 +120,7 @@ public class TaskLog {
     //to be associated with each task attempt since jvm reuse is disabled
     //when profiling/debugging is enabled
     if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
-      l.length = new File(getBaseDir(l.location), filter.toString()).length();
+      l.length = new File(getAttemptDir(l.location), filter.toString()).length();
       l.start = 0;
       fis.close();
       return l;
@@ -140,7 +142,7 @@ public class TaskLog {
   }
   
   private static File getTmpIndexFile(String taskid) {
-    return new File(getBaseDir(taskid), "log.tmp");
+    return new File(getAttemptDir(taskid), "log.tmp");
   }
   public static File getIndexFile(String taskid) {
     return getIndexFile(taskid, false);
@@ -148,9 +150,9 @@ public class TaskLog {
   
   public static File getIndexFile(String taskid, boolean isCleanup) {
     if (isCleanup) {
-      return new File(getBaseDir(taskid), "log.index.cleanup");
+      return new File(getAttemptDir(taskid), "log.index.cleanup");
     } else {
-      return new File(getBaseDir(taskid), "log.index");
+      return new File(getAttemptDir(taskid), "log.index");
     }
   }
 
@@ -158,8 +160,9 @@ public class TaskLog {
     return System.getProperty("hadoop.log.dir");
   }
 
-  static File getBaseDir(String taskid) {
-    return new File(LOG_DIR, taskid);
+  static File getAttemptDir(String taskid) {
+    return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
+        taskid);
   }
   private static long prevOutLength;
   private static long prevErrLength;
@@ -271,38 +274,6 @@ public class TaskLog {
     }
   }
 
-  private static class TaskLogsPurgeFilter implements FileFilter {
-    long purgeTimeStamp;
-  
-    TaskLogsPurgeFilter(long purgeTimeStamp) {
-      this.purgeTimeStamp = purgeTimeStamp;
-    }
-
-    public boolean accept(File file) {
-      LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
-      return file.lastModified() < purgeTimeStamp;
-    }
-  }
-  /**
-   * Purge old user logs.
-   * 
-   * @throws IOException
-   */
-  public static synchronized void cleanup(int logsRetainHours
-                                          ) throws IOException {
-    // Purge logs of tasks on this tasktracker if their  
-    // mtime has exceeded "mapred.task.log.retain" hours
-    long purgeTimeStamp = System.currentTimeMillis() - 
-                            (logsRetainHours*60L*60*1000);
-    File[] oldTaskLogs = LOG_DIR.listFiles
-                           (new TaskLogsPurgeFilter(purgeTimeStamp));
-    if (oldTaskLogs != null) {
-      for (int i=0; i < oldTaskLogs.length; ++i) {
-        FileUtil.fullyDelete(oldTaskLogs[i]);
-      }
-    }
-  }
-
   static class Reader extends InputStream {
     private long bytesRemaining;
     private FileInputStream file;
@@ -341,7 +312,7 @@ public class TaskLog {
       start += fileDetail.start;
       end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(new File(getBaseDir(fileDetail.location), 
+      file = new FileInputStream(new File(getAttemptDir(fileDetail.location), 
           kind.toString()));
       // skip upto start
       long pos = 0;
@@ -648,4 +619,14 @@ public class TaskLog {
     return LOG_DIR;
   }
   
+  /**
+   * Get the user log directory for the job jobid.
+   * 
+   * @param jobid
+   * @return user log directory for the job
+   */
+  public static File getJobDir(JobID jobid) {
+    return new File(getUserLogDir(), jobid.toString());
+  }
+
 } // TaskLog

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Tue Mar  9 10:56:44 2010
@@ -141,7 +141,7 @@ public class TaskLogServlet extends Http
    */
   static Configuration getConfFromJobACLsFile(String attemptIdStr) {
     Configuration conf = new Configuration(false);
-    conf.addResource(new Path(TaskLog.getBaseDir(attemptIdStr).toString(),
+    conf.addResource(new Path(TaskLog.getAttemptDir(attemptIdStr).toString(),
         TaskRunner.jobACLsFile));
     return conf;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar  9 10:56:44 2010
@@ -257,7 +257,7 @@ public class TaskTracker 
   int workerThreads;
   CleanupQueue directoryCleanupThread;
   volatile JvmManager jvmManager;
-  
+  UserLogCleaner taskLogCleanupThread;
   private TaskMemoryManagerThread taskMemoryManager;
   private boolean taskMemoryManagerEnabled = true;
   private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
@@ -709,6 +709,9 @@ public class TaskTracker 
 
     setIndexCache(new IndexCache(this.fConf));
 
+    //clear old user logs
+    taskLogCleanupThread.clearOldUserLogs(this.fConf);
+
     mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
     reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
     mapLauncher.start();
@@ -943,8 +946,7 @@ public class TaskTracker 
                               new LocalDirAllocator(MRConfig.LOCAL_DIR);
 
   // intialize the job directory
-  @SuppressWarnings("unchecked")
-  private void localizeJob(TaskInProgress tip
+  RunningJob localizeJob(TaskInProgress tip
                            ) throws IOException, InterruptedException {
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
@@ -957,6 +959,8 @@ public class TaskTracker 
       if (!rjob.localized) {
        
         JobConf localJobConf = localizeJobFiles(t, rjob);
+        // initialize job log directory
+        initializeJobLogDir(jobId);
 
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -974,7 +978,7 @@ public class TaskTracker 
         rjob.localized = true;
       }
     }
-    launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
+    return rjob;
   }
 
   private FileSystem getFS(final Path filePath, JobID jobId, 
@@ -1056,6 +1060,14 @@ public class TaskTracker 
     return localJobConf;
   }
 
+  // create job userlog dir
+  void initializeJobLogDir(JobID jobId) {
+    // remove it from tasklog cleanup thread first,
+    // it might be added there because of tasktracker reinit or restart
+    taskLogCleanupThread.unmarkJobFromLogDeletion(jobId);
+    localizer.initializeJobLogDir(jobId);
+  }
+
   /**
    * Download the job configuration file from the FS.
    * 
@@ -1263,7 +1275,8 @@ public class TaskTracker 
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
-    
+    // create task log cleanup thread
+    setTaskLogCleanupThread(new UserLogCleaner(fConf));
     // Initialize the jobACLSManager
     jobACLsManager = new TaskTrackerJobACLsManager(this);
     initialize();
@@ -1282,6 +1295,9 @@ public class TaskTracker 
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
+    // start tasklog cleanup thread
+    taskLogCleanupThread.setDaemon(true);
+    taskLogCleanupThread.start();
   }
 
   // only used by tests
@@ -1293,6 +1309,14 @@ public class TaskTracker 
     return directoryCleanupThread;
   }
 
+  UserLogCleaner getTaskLogCleanupThread() {
+    return this.taskLogCleanupThread;
+  }
+
+  void setTaskLogCleanupThread(UserLogCleaner t) {
+    this.taskLogCleanupThread = t;
+  }
+
   void setIndexCache(IndexCache cache) {
     this.indexCache = cache;
   }
@@ -1808,7 +1832,7 @@ public class TaskTracker 
    * @param action The action with the job
    * @throws IOException
    */
-  private synchronized void purgeJob(KillJobAction action) throws IOException {
+  synchronized void purgeJob(KillJobAction action) throws IOException {
     JobID jobId = action.getJobID();
     LOG.info("Received 'KillJobAction' for job: " + jobId);
     RunningJob rjob = null;
@@ -1833,6 +1857,11 @@ public class TaskTracker 
         if (!rjob.keepJobFiles) {
           removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString());
         }
+        // add job to taskLogCleanupThread
+        long now = System.currentTimeMillis();
+        taskLogCleanupThread.markJobLogsForDeletion(now, rjob.jobConf,
+            rjob.jobid);
+
         // Remove this job 
         rjob.tasks.clear();
       }
@@ -2186,7 +2215,8 @@ public class TaskTracker 
    */
   void startNewTask(TaskInProgress tip) {
     try {
-      localizeJob(tip);
+      RunningJob rjob = localizeJob(tip);
+      launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -3262,6 +3292,10 @@ public class TaskTracker 
     FetchStatus getFetchStatus() {
       return f;
     }
+
+    JobConf getJobConf() {
+      return jobConf;
+    }
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java?rev=920793&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/UserLogCleaner.java Tue Mar  9 10:56:44 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+/**
+ * This is a thread in TaskTracker to cleanup user logs.
+ * 
+ * Responsibilities of this thread include:
+ * <ol>
+ * <li>Removing old user logs</li>
+ * </ol>
+ */
+@InterfaceAudience.Private
+class UserLogCleaner extends Thread {
+  private static final Log LOG = LogFactory.getLog(UserLogCleaner.class);
+  static final int DEFAULT_USER_LOG_RETAIN_HOURS = 24; // 1 day
+  static final long DEFAULT_THREAD_SLEEP_TIME = 1000 * 60 * 60; // 1 hour
+
+  private Map<JobID, Long> completedJobs = Collections
+      .synchronizedMap(new HashMap<JobID, Long>());
+  private final long threadSleepTime;
+  private MRAsyncDiskService logAsyncDisk;
+  private Clock clock;
+
+  UserLogCleaner(Configuration conf) throws IOException {
+    threadSleepTime = conf.getLong(TTConfig.TT_USERLOGCLEANUP_SLEEPTIME,
+        DEFAULT_THREAD_SLEEP_TIME);
+    logAsyncDisk = new MRAsyncDiskService(FileSystem.getLocal(conf), TaskLog
+        .getUserLogDir().toString());
+    setClock(new Clock());
+  }
+
+  void setClock(Clock clock) {
+    this.clock = clock;
+  }
+
+  Clock getClock() {
+    return this.clock;
+  }
+
+  @Override
+  public void run() {
+    // This thread wakes up after every threadSleepTime interval
+    // and deletes if there are any old logs.
+    while (true) {
+      try {
+        // sleep
+        Thread.sleep(threadSleepTime);
+        processCompletedJobs();
+      } catch (Throwable e) {
+        LOG.warn(getClass().getSimpleName()
+            + " encountered an exception while monitoring :", e);
+        LOG.info("Ingoring the exception and continuing monitoring.");
+      }
+    }
+  }
+
+  void processCompletedJobs() throws IOException {
+    long now = clock.getTime();
+    // iterate through completedJobs and remove old logs.
+    synchronized (completedJobs) {
+      Iterator<Entry<JobID, Long>> completedJobIter = completedJobs.entrySet()
+          .iterator();
+      while (completedJobIter.hasNext()) {
+        Entry<JobID, Long> entry = completedJobIter.next();
+        // see if the job is old enough
+        if (entry.getValue().longValue() <= now) {
+          // add the job logs directory to for delete
+          deleteLogPath(TaskLog.getJobDir(entry.getKey()).getAbsolutePath());
+          completedJobIter.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * Clears all the logs in userlog directory.
+   * 
+   * Adds the job directories for deletion with default retain hours. Deletes
+   * all other directories, if any. This is usually called on reinit/restart of
+   * the TaskTracker
+   * 
+   * @param conf
+   * @throws IOException
+   */
+  void clearOldUserLogs(Configuration conf) throws IOException {
+    File userLogDir = TaskLog.getUserLogDir();
+    if (userLogDir.exists()) {
+      String[] logDirs = userLogDir.list();
+      if (logDirs.length > 0) {
+        // add all the log dirs to taskLogsMnonitor.
+        long now = clock.getTime();
+        for (String logDir : logDirs) {
+          if (logDir.equals(logAsyncDisk.TOBEDELETED)) {
+            // skip this
+            continue;
+          }
+          JobID jobid = null;
+          try {
+            jobid = JobID.forName(logDir);
+          } catch (IllegalArgumentException ie) {
+            // if the directory is not a jobid, delete it immediately
+            deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+            continue;
+          }
+          // add the job log directory with default retain hours, if it is not
+          // already added
+          if (!completedJobs.containsKey(jobid)) {
+            markJobLogsForDeletion(now, conf, jobid);
+          }
+        }
+      }
+    }
+  }
+
+  private int getUserlogRetainMillis(Configuration conf) {
+    return (conf == null ? UserLogCleaner.DEFAULT_USER_LOG_RETAIN_HOURS
+        : conf.getInt(JobContext.USER_LOG_RETAIN_HOURS,
+            UserLogCleaner.DEFAULT_USER_LOG_RETAIN_HOURS)) * 1000 * 60 * 60;
+  }
+
+  /**
+   * Adds job user-log directory to cleanup thread to delete logs after user-log
+   * retain hours.
+   * 
+   * If the configuration is null or user-log retain hours is not configured, it
+   * is deleted after
+   * {@value UserLogCleaner#DEFAULT_USER_LOG_RETAIN_HOURS}
+   * 
+   * @param jobCompletionTime
+   *          job completion time in millis
+   * @param conf
+   *          The configuration from which user-log retain hours should be read
+   * @param jobid
+   *          JobID for which user logs should be deleted
+   */
+  public void markJobLogsForDeletion(long jobCompletionTime, Configuration conf,
+      JobID jobid) {
+    long retainTimeStamp = jobCompletionTime + (getUserlogRetainMillis(conf));
+    LOG.info("Adding " + jobid + " for user-log deletion with retainTimeStamp:"
+        + retainTimeStamp);
+    completedJobs.put(jobid, Long.valueOf(retainTimeStamp));
+  }
+
+  /**
+   * Remove job from user log deletion.
+   * 
+   * @param jobid
+   */
+  public void unmarkJobFromLogDeletion(JobID jobid) {
+    if (completedJobs.remove(jobid) != null) {
+      LOG.info("Removing " + jobid + " from user-log deletion");
+    }
+  }
+
+  /**
+   * Deletes the log path.
+   * 
+   * This path will be removed immediately through {@link MRAsyncDiskService}
+   * 
+   * @param logPath
+   * @throws IOException
+   */
+  private void deleteLogPath(String logPath) throws IOException {
+    LOG.info("Deleting user log path " + logPath);
+    logAsyncDisk.moveAndDeleteAbsolutePath(logPath);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Mar  9 10:56:44 2010
@@ -113,6 +113,8 @@ public interface JobContext {
     "mapreduce.job.cache.archives.visibilities";
   public static final String CACHE_SYMLINK = 
     "mapreduce.job.cache.symlink.create";
+  public static final String USER_LOG_RETAIN_HOURS = 
+    "mapreduce.job.userlog.retain.hours";
   
   public static final String IO_SORT_FACTOR = 
     "mapreduce.task.io.sort.factor"; 
@@ -143,8 +145,6 @@ public interface JobContext {
   public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
   public static final String TASK_USERLOG_LIMIT = 
     "mapreduce.task.userlog.limit.kb";
-  public static final String TASK_LOG_RETAIN_HOURS = 
-    "mapred.task.userlog.retain.hours";
   
   public static final String MAP_SORT_SPILL_PERCENT =
     "mapreduce.map.sort.spill.percent";

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Tue Mar  9 10:56:44 2010
@@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.mapreduce.JobID;
 
 /**
  * 
@@ -358,4 +359,22 @@ public class Localizer {
           + attemptId.toString());
     }
   }
+
+  /**
+   * Create job log directory and set appropriate permissions for the directory.
+   * 
+   * @param jobId
+   */
+  public void initializeJobLogDir(JobID jobId) {
+    File jobUserLogDir = TaskLog.getJobDir(jobId);
+    if (!jobUserLogDir.exists()) {
+      boolean ret = jobUserLogDir.mkdirs();
+      if (!ret) {
+        LOG.warn("Could not create job user log directory: " + jobUserLogDir);
+        return;
+      }
+    }
+    Localizer.PermissionsHandler.setPermissions(jobUserLogDir,
+        Localizer.PermissionsHandler.sevenZeroZero);
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Tue Mar  9 10:56:44 2010
@@ -88,4 +88,6 @@ public interface TTConfig extends MRConf
     "mapreduce.tasktracker.keytab.file";
   public static final String TT_GROUP = 
     "mapreduce.tasktracker.group";
+  public static final String TT_USERLOGCLEANUP_SLEEPTIME = 
+    "mapreduce.tasktracker.userlogcleanup.sleeptime";
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Tue Mar  9 10:56:44 2010
@@ -292,7 +292,7 @@ public class ConfigUtil {
     Configuration.addDeprecation("mapred.userlog.limit.kb", 
       new String[] {JobContext.TASK_USERLOG_LIMIT});
     Configuration.addDeprecation("mapred.userlog.retain.hours", 
-      new String[] {JobContext.TASK_LOG_RETAIN_HOURS});
+      new String[] {JobContext.USER_LOG_RETAIN_HOURS});
     Configuration.addDeprecation("mapred.task.profile.params", 
       new String[] {JobContext.TASK_PROFILE_PARAMS});
     Configuration.addDeprecation("io.sort.spill.percent", 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Tue Mar  9 10:56:44 2010
@@ -67,7 +67,7 @@ public class MRAsyncDiskService {
    * @param localFileSystem The localFileSystem used for deletions.
    * @param volumes The roots of the file system volumes.
    */
-  public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes)
+  public MRAsyncDiskService(FileSystem localFileSystem, String... volumes)
       throws IOException {
     
     this.volumes = new String[volumes.length];

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Tue Mar  9 10:56:44 2010
@@ -206,6 +206,11 @@ public class TestLocalizationWithLinuxTa
       checkFilePermissions(file.toUri().getPath(), expectedFilePerms, task
           .getUser(), ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
     }
+
+    // check job user-log directory permissions
+    File jobLogDir = TaskLog.getJobDir(jobId);
+    checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=920793&r1=920792&r2=920793&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Mar  9 10:56:44 2010
@@ -84,6 +84,7 @@ public class TestTaskTrackerLocalization
   protected Path attemptWorkDir;
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
+  private TaskInProgress tip;
 
   /**
    * Dummy method in this base class. Only derived classes will define this
@@ -122,7 +123,17 @@ public class TestTaskTrackerLocalization
     trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
 
     // Create the job configuration file. Same as trackerConf in this test.
-    Job job = new Job(trackerFConf);
+    Configuration jobConf = new Configuration(trackerFConf);
+    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
+    // can be done against this value. Have both users and groups
+    String jobViewACLs = "user1,user2, group1,group2";
+    jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+
+    jobConf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 0);
+
+    Job job = new Job(jobConf);
+    String jtIdentifier = "200907202331";
+    jobId = new JobID(jtIdentifier, 1);
 
     // JobClient uploads the job jar to the file system and sets it in the
     // jobConf.
@@ -131,9 +142,15 @@ public class TestTaskTrackerLocalization
     // JobClient uploads the jobConf to the file system.
     File jobConfFile = uploadJobConf(job.getConfiguration());
     
+    // create jobTokens file
+    uploadJobTokensFile(); 
+    
         // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
+    tracker.setIndexCache(new IndexCache(trackerFConf));
+    tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf));
+    tracker.setTaskMemoryManagerEnabledFlag();
 
     // for test case system FS is the local FS
     tracker.systemFS = FileSystem.getLocal(trackerFConf);
@@ -143,13 +160,6 @@ public class TestTaskTrackerLocalization
     taskTrackerUGI = UserGroupInformation.getCurrentUser();
 
     // Set up the task to be localized
-    String jtIdentifier = "200907202331";
-    jobId = new JobID(jtIdentifier, 1);
-    
-    TaskTracker.RunningJob rjob = new TaskTracker.RunningJob(jobId);
-    rjob.ugi = UserGroupInformation.getCurrentUser();
-    tracker.runningJobs.put(jobId, rjob);
-    
     taskId =
         new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
     task =
@@ -157,10 +167,6 @@ public class TestTaskTrackerLocalization
     task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
     task.setUser(UserGroupInformation.getCurrentUser().getUserName());
 
-    // create jobTokens file
-    uploadJobTokensFile(); 
-    
-    
     taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
     taskController.setup();
@@ -168,6 +174,10 @@ public class TestTaskTrackerLocalization
     tracker.setTaskController(taskController);
     tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
         taskController));
+
+    // mimic register task
+    // create the tip
+    tip = tracker.new TaskInProgress(task, trackerFConf);
   }
 
   /**
@@ -409,24 +419,8 @@ public class TestTaskTrackerLocalization
     if (!canRun()) {
       return;
     }
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-
-    // /////////// The main method being tested
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
-    // ///////////
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext context = new JobInitializationContext();
-    context.jobid = jobId;
-    context.user = task.getUser();
-    context.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-
-    // /////////// The method being tested
-    taskController.initializeJob(context);
-    // ///////////
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
 
     checkJobLocalization();
   }
@@ -494,6 +488,13 @@ public class TestTaskTrackerLocalization
     assertTrue(
         "mapreduce.job.jar is not set properly to the target users directory : "
             + localizedJobJar, mapredJarFlag);
+
+    // check job user-log directory permissions
+    File jobLogDir = TaskLog.getJobDir(jobId);
+    assertTrue("job log directory " + jobLogDir + " does not exist!", jobLogDir
+        .exists());
+    checkFilePermissions(jobLogDir.toString(), "drwx------", task.getUser(),
+        taskTrackerUGI.getGroupNames()[0]);
   }
 
   /**
@@ -506,25 +507,9 @@ public class TestTaskTrackerLocalization
     if (!canRun()) {
       return;
     }
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
-
-    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
-    // can be done against this value. Have both users and groups
-    String jobViewACLs = "user1,user2, group1,group2";
-    localizedJobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = task.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
 
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
 
     // ////////// The central method being tested
@@ -605,9 +590,7 @@ public class TestTaskTrackerLocalization
         .getPath(), "tmp").exists());
 
     // Make sure that the logs are setup properly
-    File logDir =
-        new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
-            + task.getTaskID().toString());
+    File logDir = TaskLog.getAttemptDir(taskId.toString());
     assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
         logDir.exists());
     checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -761,21 +744,12 @@ public class TestTaskTrackerLocalization
   private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
       throws Exception {
     // Localize job and localize task.
-    tracker.getLocalizer().initializeUserDirs(task.getUser());
-    localizedJobConf = tracker.localizeJobFiles(task, 
-        new TaskTracker.RunningJob(task.getJobID()));
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
     if (jvmReuse) {
       localizedJobConf.setNumTasksToExecutePerJvm(2);
     }
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = localizedJobConf.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
     tip.setJobConf(localizedJobConf);
     tip.localizeTask(task);
     Path workDir =
@@ -829,21 +803,16 @@ public class TestTaskTrackerLocalization
    */
   private void verifyUserLogsCleanup()
       throws IOException {
-    Path logDir =
-        new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
-            + Path.SEPARATOR + task.getTaskID().toString());
-
+    // verify user logs cleanup
+    File jobUserLogDir = TaskLog.getJobDir(jobId);
     // Logs should be there before cleanup.
-    assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
-        tracker.getLocalFileSystem().exists(logDir));
-
-    // ////////// Another being tested
-    TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
-    // modification time behind retainTimeStatmp
-    // //////////
-
+    assertTrue("Userlogs dir " + jobUserLogDir + " is not present as expected!!",
+          jobUserLogDir.exists());
+    tracker.purgeJob(new KillJobAction(jobId));
+    tracker.getTaskLogCleanupThread().processCompletedJobs();
+    
     // Logs should be gone after cleanup.
-    assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
-        tracker.getLocalFileSystem().exists(logDir));
+    assertFalse("Userlogs dir " + jobUserLogDir + " is not deleted as expected!!",
+        jobUserLogDir.exists());
   }
 }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=920793&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java Tue Mar  9 10:56:44 2010
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Test;
+
+public class TestUserLogCleanup {
+  private static String jtid = "test";
+  private static long ONE_HOUR = 1000 * 60 * 60;
+  private Localizer localizer;
+  private UserLogCleaner taskLogCleanupThread;
+  private TaskTracker tt;
+  private FakeClock myClock = new FakeClock();
+  private JobID jobid1 = new JobID(jtid, 1);
+  private JobID jobid2 = new JobID(jtid, 2);
+  private JobID jobid3 = new JobID(jtid, 3);
+  private JobID jobid4 = new JobID(jtid, 4);
+  private File foo = new File(TaskLog.getUserLogDir(), "foo");
+  private File bar = new File(TaskLog.getUserLogDir(), "bar");
+
+  public TestUserLogCleanup() throws IOException {
+    Configuration conf = new Configuration();
+    localizer = new Localizer(FileSystem.get(conf), conf
+        .getStrings(MRConfig.LOCAL_DIR), new DefaultTaskController());
+    taskLogCleanupThread = new UserLogCleaner(conf);
+    taskLogCleanupThread.setClock(myClock);
+    tt = new TaskTracker();
+    tt.setLocalizer(localizer);
+    tt.setTaskLogCleanupThread(taskLogCleanupThread);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtil.fullyDelete(TaskLog.getUserLogDir());
+  }
+
+  private File localizeJob(JobID jobid) throws IOException {
+    File jobUserlog = TaskLog.getJobDir(jobid);
+
+    // localize job log directory
+    tt.initializeJobLogDir(jobid);
+    assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
+    return jobUserlog;
+  }
+
+  private void jobFinished(JobID jobid, int logRetainHours) {
+    Configuration jobconf = new Configuration();
+    jobconf.setInt(JobContext.USER_LOG_RETAIN_HOURS, logRetainHours);
+    taskLogCleanupThread.markJobLogsForDeletion(myClock.getTime(), jobconf,
+        jobid);
+  }
+
+  /**
+   * Tests job user-log directory deletion.
+   * 
+   * Adds two jobs for log deletion. One with one hour retain hours, other with
+   * two retain hours. After an hour,
+   * TaskLogCleanupThread.processCompletedJobs() call,
+   * makes sure job with 1hr retain hours is removed and other is retained.
+   * After one more hour, job with 2hr retain hours is also removed.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testJobLogCleanup() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+
+    // add job user log directory for deletion, with 2 hours for deletion
+    jobFinished(jobid1, 2);
+
+    // add the job for deletion with one hour as retain hours
+    jobFinished(jobid2, 1);
+
+    // remove old logs and see jobid1 is not removed and jobid2 is removed
+    myClock.advance(ONE_HOUR);
+    taskLogCleanupThread.processCompletedJobs();
+    assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists());
+    assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists());
+
+    myClock.advance(ONE_HOUR);
+    // remove old logs and see jobid1 is removed now
+    taskLogCleanupThread.processCompletedJobs();
+    assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists());
+  }
+
+  /**
+   * Tests user-log directory cleanup on a TT re-init with 3 hours as log
+   * retain hours for tracker. 
+   * 
+   * Adds job1 deletion before the re-init with 2 hour retain hours. 
+   * Adds job2 for which there are no tasks/killJobAction after the re-init.
+   * Adds job3 for which there is localizeJob followed by killJobAction 
+   * with 3 hours as retain hours.
+   * Adds job4 for which there are some tasks after the re-init.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testUserLogCleanup() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+    File jobUserlog3 = localizeJob(jobid3);
+    File jobUserlog4 = localizeJob(jobid4);
+    // create a some files/dirs in userlog
+    foo.mkdirs();
+    bar.createNewFile();
+
+    // add the jobid1 for deletion with retainhours = 2
+    jobFinished(jobid1, 2);
+
+    // time is now 1.
+    myClock.advance(ONE_HOUR);
+    
+    // mimic TaskTracker reinit
+    // clear userlog directory
+    // job directories will be added with 3 hours as retain hours. They will be
+    // deleted at time 4.
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+    taskLogCleanupThread.clearOldUserLogs(conf);
+    assertFalse(foo.exists());
+    assertFalse(bar.exists());
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+    assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
+        .exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 2.
+    taskLogCleanupThread.processCompletedJobs();
+    assertFalse(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+    
+    // mimic localizeJob followed KillJobAction for jobid3
+    // add the job for deletion with retainhours = 3. 
+    // jobid3 should be deleted at time 5.
+    jobUserlog3 = localizeJob(jobid3);
+    jobFinished(jobid3, 3);
+
+    // mimic localizeJob for jobid4
+    jobUserlog4 = localizeJob(jobid4);
+
+    // do cleanup
+    myClock.advance(2 * ONE_HOUR);
+    // time is now 4.
+    taskLogCleanupThread.processCompletedJobs();
+
+    // jobid2 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 5.
+    // do cleanup again
+    taskLogCleanupThread.processCompletedJobs();
+
+    // jobid3 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertFalse(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+  }
+
+  /**
+   * Tests user-log directory cleanup on a TT restart.
+   * 
+   * Adds job1 deletion before the restart with 2 hour retain hours.
+   * Adds job2 for which there are no tasks/killJobAction after the restart.
+   * Adds job3 for which there is localizeJob followed by killJobAction after
+   * the restart with 3 hours retain hours.
+   * Adds job4 for which there are some tasks after the restart.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testUserLogCleanupAfterRestart() throws IOException {
+    File jobUserlog1 = localizeJob(jobid1);
+    File jobUserlog2 = localizeJob(jobid2);
+    File jobUserlog3 = localizeJob(jobid3);
+    File jobUserlog4 = localizeJob(jobid4);
+    // create a some files/dirs in userlog
+    foo.mkdirs();
+    bar.createNewFile();
+
+    // add the jobid1 for deletion with retainhours = 2
+    jobFinished(jobid1, 2);
+
+    // time is now 1.
+    myClock.advance(ONE_HOUR);
+    
+    // mimic TaskTracker restart
+    // clear userlog directory
+    // job directories will be added with 3 hours as retain hours. 
+    Configuration conf = new Configuration();
+    conf.setInt(JobContext.USER_LOG_RETAIN_HOURS, 3);
+    taskLogCleanupThread = new UserLogCleaner(conf);
+    myClock = new FakeClock(); // clock is reset.
+    taskLogCleanupThread.setClock(myClock);
+    taskLogCleanupThread.clearOldUserLogs(conf);
+    tt.setTaskLogCleanupThread(taskLogCleanupThread);
+    assertFalse(foo.exists());
+    assertFalse(bar.exists());
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+    assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
+        .exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 1.
+    taskLogCleanupThread.processCompletedJobs();
+    assertTrue(jobUserlog1.exists());
+    assertTrue(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+    
+    // mimic localizeJob followed KillJobAction for jobid3
+    // add the job for deletion with retainhours = 3. 
+    // jobid3 should be deleted at time 4.
+    jobUserlog3 = localizeJob(jobid3);
+    jobFinished(jobid3, 3);
+
+    // mimic localizeJob for jobid4
+    jobUserlog4 = localizeJob(jobid4);
+
+    // do cleanup
+    myClock.advance(2 * ONE_HOUR);
+    // time is now 3.
+    taskLogCleanupThread.processCompletedJobs();
+
+    // jobid1 and jobid2 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertTrue(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+
+    myClock.advance(ONE_HOUR);
+    // time is now 4.
+    // do cleanup again
+    taskLogCleanupThread.processCompletedJobs();
+
+    // jobid3 will be deleted
+    assertFalse(jobUserlog1.exists());
+    assertFalse(jobUserlog2.exists());
+    assertFalse(jobUserlog3.exists());
+    assertTrue(jobUserlog4.exists());
+  }
+}