You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:12:53 UTC

svn commit: r1077418 - in /hadoop/common/branches/branch-0.20-security-patches: conf/ src/c++/task-controller/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/pipes/ s...

Author: omalley
Date: Fri Mar  4 04:12:53 2011
New Revision: 1077418

URL: http://svn.apache.org/viewvc?rev=1077418&view=rev
Log:
commit cbe7cc71a1649ab20c3cbb964e0f59a5e3372a58
Author: Vinod Kumar <vi...@yahoo-inc.com>
Date:   Thu Apr 22 16:47:53 2010 +0530

    MAPREDUCE-1607. From https://issues.apache.org/jira/secure/attachment/12442538/patch-1607-ydist.txt
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1607. Task controller may not set permissions for a
    +    task cleanup attempt's log directory. (Amareshwari Sreeramadasu via vinodkv)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmReuse.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties
    hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskFail.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java

Modified: hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties (original)
+++ hadoop/common/branches/branch-0.20-security-patches/conf/log4j.properties Fri Mar  4 04:12:53 2011
@@ -55,6 +55,7 @@ log4j.appender.console.layout.Conversion
 
 #Default values
 hadoop.tasklog.taskid=null
+hadoop.tasklog.iscleanup=false
 hadoop.tasklog.noKeepSplits=4
 hadoop.tasklog.totalLogFileSize=100
 hadoop.tasklog.purgeLogSplits=true
@@ -62,6 +63,7 @@ hadoop.tasklog.logsRetainHours=12
 
 log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
 log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
 log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
 
 log4j.appender.TLA.layout=org.apache.log4j.PatternLayout

Modified: hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/c%2B%2B/task-controller/task-controller.c?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/c++/task-controller/task-controller.c Fri Mar  4 04:12:53 2011
@@ -582,10 +582,6 @@ int prepare_task_logs(const char *log_di
   if (stat(task_log_dir, &filestat) != 0) {
     if (errno == ENOENT) {
       // See TaskRunner.java to see that an absent log-dir doesn't fail the task.
-      // Task log dir for cleanup tasks will not have the name
-      // task-attempt-id.cleanup. Instead a log.index.cleanup is created in
-      // task-attempt log dir. We check if the directory exists and return if
-      // it doesn't. So the following will work for cleanup attempts too.
 #ifdef DEBUG
       fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
           task_log_dir);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Fri Mar  4 04:12:53 2011
@@ -132,6 +132,7 @@ public class TestStreamingTaskLog extend
     long logSize = USERLOG_LIMIT_KB * 1024;
     assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
                && env.contains("-Dhadoop.tasklog.taskid=attempt_")
-               && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize));
+               && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize)
+               && env.contains("-Dhadoop.tasklog.iscleanup=false"));
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 04:12:53 2011
@@ -64,8 +64,9 @@ class Child {
     int port = Integer.parseInt(args[1]);
     final InetSocketAddress address = new InetSocketAddress(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+    final String logLocation = args[3];
     final int SLEEP_LONGER_COUNT = 5;
-    int jvmIdInt = Integer.parseInt(args[3]);
+    int jvmIdInt = Integer.parseInt(args[4]);
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
 
     // file name is passed thru env
@@ -106,7 +107,7 @@ class Child {
       public void run() {
         try {
           if (taskid != null) {
-            TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+            TaskLog.syncLogs(logLocation, taskid, isCleanup);
           }
         } catch (Throwable throwable) {
         }
@@ -120,7 +121,7 @@ class Child {
           try {
             Thread.sleep(5000);
             if (taskid != null) {
-              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+              TaskLog.syncLogs(logLocation, taskid, isCleanup);
             }
           } catch (InterruptedException ie) {
           } catch (IOException iee) {
@@ -172,7 +173,7 @@ class Child {
 
         //create the index file so that the log files 
         //are viewable immediately
-        TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+        TaskLog.syncLogs(logLocation, taskid, isCleanup);
         
         // Create the job-conf and set credentials
         final JobConf job = new JobConf(task.getJobFile());
@@ -215,7 +216,7 @@ class Child {
               FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
               taskFinal.run(job, umbilical);             // run the task
             } finally {
-              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+              TaskLog.syncLogs(logLocation, taskid, isCleanup);
             }
 
             return null;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 04:12:53 2011
@@ -484,8 +484,9 @@ class JvmManager {
       // Post-JVM-exit logs processing. inform user log manager
       private void jvmFinished() {
         Task firstTask = initalContext.task;
-        JvmFinishedEvent jfe = new JvmFinishedEvent(new JVMInfo(firstTask
-            .getTaskID(), tasksGiven));
+        JvmFinishedEvent jfe = new JvmFinishedEvent(new JVMInfo(
+            TaskLog.getAttemptDir(firstTask.getTaskID(), firstTask.isTaskCleanupTask()),
+            tasksGiven));
         tracker.getUserLogManager().addLogEvent(jfe);
       }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar  4 04:12:53 2011
@@ -52,6 +52,8 @@ import org.apache.log4j.Logger;
  * A simple logger to handle the task-specific user logs.
  * This class uses the system property <code>hadoop.log.dir</code>.
  * 
+ * This class is for Map/Reduce internal use only.
+ * 
  */
 public class TaskLog {
   private static final Log LOG =
@@ -70,31 +72,9 @@ public class TaskLog {
     }
   }
 
-  public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(getAttemptDir(taskid.toString()), filter.toString());
-  }
-
-  /**
-   * @deprecated Instead use
-   *             {@link #getAllLogsFileDetails(TaskAttemptID, boolean)} to get
-   *             the details of all log-files and then use the particular
-   *             log-type's detail to call getRealTaskLogFileLocation(String,
-   *             LogName) real log-location
-   */
-  @Deprecated
-  public static File getRealTaskLogFileLocation(TaskAttemptID taskid, 
+  public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
       LogName filter) {
-    LogFileDetail l;
-    try {
-      Map<LogName, LogFileDetail> allFilesDetails =
-          getAllLogsFileDetails(taskid, false);
-      l = allFilesDetails.get(filter);
-    } catch (IOException ie) {
-      LOG.error("getTaskLogFileDetailgetAllLogsFileDetails threw an exception "
-          + ie);
-      return null;
-    }
-    return new File(getAttemptDir(l.location), filter.toString());
+    return new File(getAttemptDir(taskid, isCleanup), filter.toString());
   }
 
   /**
@@ -108,8 +88,7 @@ public class TaskLog {
    */
   static String getRealTaskLogFilePath(String location, LogName filter)
       throws IOException {
-    return FileUtil.makeShellPath(new File(getAttemptDir(location),
-        filter.toString()));
+    return FileUtil.makeShellPath(new File(location, filter.toString()));
   }
 
   static class LogFileDetail {
@@ -125,7 +104,7 @@ public class TaskLog {
     Map<LogName, LogFileDetail> allLogsFileDetails =
         new HashMap<LogName, LogFileDetail>();
 
-    File indexFile = getIndexFile(taskid.toString(), isCleanup);
+    File indexFile = getIndexFile(taskid, isCleanup);
     BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
@@ -144,7 +123,7 @@ public class TaskLog {
     for (LogName filter : new LogName[] { LogName.DEBUGOUT, LogName.PROFILE }) {
       LogFileDetail l = new LogFileDetail();
       l.location = loc;
-      l.length = new File(getAttemptDir(l.location), filter.toString()).length();
+      l.length = new File(l.location, filter.toString()).length();
       l.start = 0;
       allLogsFileDetails.put(filter, l);
     }
@@ -165,28 +144,21 @@ public class TaskLog {
     return allLogsFileDetails;
   }
   
-  private static File getTmpIndexFile(String taskid) {
-    return new File(getAttemptDir(taskid), "log.tmp");
+  private static File getTmpIndexFile(TaskAttemptID taskid, boolean isCleanup) {
+    return new File(getAttemptDir(taskid, isCleanup), "log.tmp");
   }
-  public static File getIndexFile(String taskid) {
-    return getIndexFile(taskid, false);
-  }
-  
-  public static File getIndexFile(String taskid, boolean isCleanup) {
-    if (isCleanup) {
-      return new File(getAttemptDir(taskid), "log.index.cleanup");
-    } else {
-      return new File(getAttemptDir(taskid), "log.index");
-    }
+
+  static File getIndexFile(TaskAttemptID taskid, boolean isCleanup) {
+    return new File(getAttemptDir(taskid, isCleanup), "log.index");
   }
 
   static String getBaseLogDir() {
     return System.getProperty("hadoop.log.dir");
   }
 
-  static File getAttemptDir(String taskid) {
-    return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
-        taskid);
+  static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
+    String cleanupSuffix = isCleanup ? ".cleanup" : "";
+    return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
   }
 
   static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
@@ -206,12 +178,12 @@ public class TaskLog {
     }
   }
   
-  static void writeToIndexFile(TaskAttemptID firstTaskid,
+  static void writeToIndexFile(String logLocation,
       TaskAttemptID currentTaskid, boolean isCleanup,
       Map<LogName, Long[]> lengths) throws IOException {
     // To ensure atomicity of updates to index file, write to temporary index
     // file first and then rename.
-    File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
+    File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
     
     BufferedOutputStream bos = 
       new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
@@ -222,7 +194,7 @@ public class TaskLog {
     //STDERR: <start-offset in the stderr file> <length>
     //SYSLOG: <start-offset in the syslog file> <length>    
     dos.writeBytes(LogFileDetail.LOCATION
-        + firstTaskid.toString()
+        + logLocation
         + "\n");
     for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
       Long[] lens = lengths.get(logName);
@@ -232,7 +204,7 @@ public class TaskLog {
           + "\n");}
     dos.close();
 
-    File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
+    File indexFile = getIndexFile(currentTaskid, isCleanup);
     Path indexFilePath = new Path(indexFile.getAbsolutePath());
     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
 
@@ -242,14 +214,8 @@ public class TaskLog {
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
 
-  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
-                                           TaskAttemptID taskid) 
-  throws IOException {
-    syncLogs(firstTaskid, taskid, false);
-  }
-
   @SuppressWarnings("unchecked")
-  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+  public synchronized static void syncLogs(String logLocation, 
                                            TaskAttemptID taskid,
                                            boolean isCleanup) 
   throws IOException {
@@ -270,21 +236,21 @@ public class TaskLog {
     for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
       if (currentTaskid != taskid) {
         // Set start = current-end
-        logLengths.get(logName)[0] =
-            Long.valueOf(getTaskLogFile(firstTaskid, logName).length());
+        logLengths.get(logName)[0] = Long.valueOf(new File(
+            logLocation, logName.toString()).length());
       }
       // Set current end
-      logLengths.get(logName)[1] =
-          Long.valueOf(getTaskLogFile(firstTaskid, logName).length());
+      logLengths.get(logName)[1] = Long.valueOf(new File(
+          logLocation, logName.toString()).length());
     }
     if (currentTaskid != taskid) {
       if (currentTaskid != null) {
         LOG.info("Starting logging for a new task " + taskid
-            + " in the same JVM as that of the first task " + firstTaskid);
+            + " in the same JVM as that of the first task " + logLocation);
       }
       currentTaskid = taskid;
     }
-    writeToIndexFile(firstTaskid, taskid, isCleanup, logLengths);
+    writeToIndexFile(logLocation, taskid, isCleanup, logLengths);
   }
   
   /**
@@ -322,11 +288,6 @@ public class TaskLog {
     private long bytesRemaining;
     private FileInputStream file;
 
-    public Reader(TaskAttemptID taskid, LogName kind, 
-                  long start, long end) throws IOException {
-      this(taskid, kind, start, end, false);
-    }
-    
     /**
      * Read a log file from start to end positions. The offsets may be negative,
      * in which case they are relative to the end of the file. For example,
@@ -358,8 +319,7 @@ public class TaskLog {
       start += fileDetail.start;
       end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(new File(getAttemptDir(fileDetail.location), 
-          kind.toString()));
+      file = new FileInputStream(new File(fileDetail.location, kind.toString()));
       // skip upto start
       long pos = 0;
       while (pos < start) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java Fri Mar  4 04:12:53 2011
@@ -34,6 +34,7 @@ public class TaskLogAppender extends Fil
   //so that log4j can configure it from the configuration(log4j.properties). 
   private int maxEvents;
   private Queue<LoggingEvent> tail = null;
+  private boolean isCleanup;
 
   @Override
   public void activateOptions() {
@@ -41,8 +42,8 @@ public class TaskLogAppender extends Fil
       if (maxEvents > 0) {
         tail = new LinkedList<LoggingEvent>();
       }
-      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId), 
-                                     TaskLog.LogName.SYSLOG).toString());
+      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+          isCleanup, TaskLog.LogName.SYSLOG).toString());
       setAppend(true);
       super.activateOptions();
     }
@@ -98,4 +99,22 @@ public class TaskLogAppender extends Fil
     maxEvents = (int) logSize / EVENT_SIZE;
   }
 
+  /**
+   * Set whether the task is a cleanup attempt or not.
+   * 
+   * @param isCleanup
+   *          true if the task is cleanup attempt, false otherwise.
+   */
+  public void setIsCleanup(boolean isCleanup) {
+    this.isCleanup = isCleanup;
+  }
+
+  /**
+   * Get whether task is cleanup attempt or not.
+   * 
+   * @return true if the task is cleanup attempt, false otherwise.
+   */
+  public boolean getIsCleanup() {
+    return isCleanup;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java Fri Mar  4 04:12:53 2011
@@ -43,8 +43,9 @@ import org.apache.hadoop.util.StringUtil
 public class TaskLogServlet extends HttpServlet {
   private static final long serialVersionUID = -6615764817774487321L;
   
-  private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
-    File f = TaskLog.getTaskLogFile(taskId, type);
+  private boolean haveTaskLog(TaskAttemptID taskId, boolean isCleanup,
+      TaskLog.LogName type) {
+    File f = TaskLog.getTaskLogFile(taskId, isCleanup, type);
     return f.canRead();
   }
 
@@ -145,9 +146,10 @@ public class TaskLogServlet extends Http
    * viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
    * cluster).
    */
-  static Configuration getConfFromJobACLsFile(String attemptIdStr) {
+  static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
+      boolean isCleanup) {
     Path jobAclsFilePath = new Path(
-        TaskLog.getAttemptDir(attemptIdStr).toString(), TaskRunner.jobACLsFile);
+        TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
     Configuration conf = null;
     if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
       conf = new Configuration(false);
@@ -176,38 +178,6 @@ public class TaskLogServlet extends Http
       return;
     }
 
-    TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
-    if (!TaskLog.getAttemptDir(attemptIdStr).exists()) {
-      response.sendError(HttpServletResponse.SC_GONE,
-          "Task log directory for task " + attemptId +
-          " does not exist. May be cleaned up by Task Tracker, if older logs.");
-      return;
-    }
-
-    // get user name who is accessing
-    String user = request.getRemoteUser();
-    if (user != null) {
-      ServletContext context = getServletContext();
-      TaskTracker taskTracker = (TaskTracker) context.getAttribute(
-          "task.tracker");
-      // get jobACLConf from ACLs file
-      Configuration jobACLConf = getConfFromJobACLsFile(attemptIdStr);
-      // Ignore authorization if job-acls.xml is not found
-      if (jobACLConf != null) {
-        JobID jobId = attemptId.getJobID();
-
-        try {
-          checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
-              taskTracker);
-        } catch (AccessControlException e) {
-          String errMsg = "User " + user + " failed to view tasklogs of job " +
-              jobId + "!\n\n" + e.getMessage();
-          response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
-          return;
-        }
-      }
-    }
-
     String logFilter = request.getParameter("filter");
     if (logFilter != null) {
       try {
@@ -240,6 +210,38 @@ public class TaskLogServlet extends Http
       isCleanup = Boolean.valueOf(sCleanup);
     }
     
+    TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
+    if (!TaskLog.getAttemptDir(attemptId, isCleanup).exists()) {
+      response.sendError(HttpServletResponse.SC_GONE,
+          "Task log directory for task " + attemptId +
+          " does not exist. May be cleaned up by Task Tracker, if older logs.");
+      return;
+    }
+
+    // get user name who is accessing
+    String user = request.getRemoteUser();
+    if (user != null) {
+      ServletContext context = getServletContext();
+      TaskTracker taskTracker = (TaskTracker) context.getAttribute(
+          "task.tracker");
+      // get jobACLConf from ACLs file
+      Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+      // Ignore authorization if job-acls.xml is not found
+      if (jobACLConf != null) {
+        JobID jobId = attemptId.getJobID();
+
+        try {
+          checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
+              taskTracker);
+        } catch (AccessControlException e) {
+          String errMsg = "User " + user + " failed to view tasklogs of job " +
+              jobId + "!\n\n" + e.getMessage();
+          response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
+          return;
+        }
+      }
+    }
+
     OutputStream out = response.getOutputStream();
     if( !plainText ) {
       out.write(("<html>\n" +
@@ -252,15 +254,15 @@ public class TaskLogServlet extends Http
                      TaskLog.LogName.STDOUT, isCleanup);
         printTaskLog(response, out, attemptId, start, end, plainText, 
                      TaskLog.LogName.STDERR, isCleanup);
-        if (haveTaskLog(attemptId, TaskLog.LogName.SYSLOG)) {
+        if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.SYSLOG)) {
           printTaskLog(response, out, attemptId, start, end, plainText,
               TaskLog.LogName.SYSLOG, isCleanup);
         }
-        if (haveTaskLog(attemptId, TaskLog.LogName.DEBUGOUT)) {
+        if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.DEBUGOUT)) {
           printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.DEBUGOUT, isCleanup);
         }
-        if (haveTaskLog(attemptId, TaskLog.LogName.PROFILE)) {
+        if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.PROFILE)) {
           printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.PROFILE, isCleanup);
         }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsTruncater.java Fri Mar  4 04:12:53 2011
@@ -72,8 +72,6 @@ public class TaskLogsTruncater {
    * retainSize.
    */
   public void truncateLogs(JVMInfo lInfo) {
-    TaskAttemptID firstAttempt = TaskAttemptID.downgrade(lInfo
-        .getFirstAttemptID());
 
     // Read the log-file details for all the attempts that ran in this JVM
     Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails;
@@ -97,14 +95,14 @@ public class TaskLogsTruncater {
           updatedTaskLogFileDetails, logName);
     }
 
-    File attemptLogDir = TaskLog.getAttemptDir(firstAttempt.toString());
+    File attemptLogDir = lInfo.getLogLocation();
 
     FileWriter tmpFileWriter;
     FileReader logFileReader;
     // Now truncate file by file
     logNameLoop: for (LogName logName : LogName.values()) {
 
-      File logFile = TaskLog.getTaskLogFile(firstAttempt, logName);
+      File logFile = new File(attemptLogDir, logName.toString());
 
       // //// Optimization: if no task is over limit, just skip truncation-code
       if (logFile.exists()
@@ -212,7 +210,8 @@ public class TaskLogsTruncater {
 
     if (indexModified) {
       // Update the index files
-      updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
+      updateIndicesAfterLogTruncation(attemptLogDir.toString(),
+          updatedTaskLogFileDetails);
     }
   }
 
@@ -363,7 +362,7 @@ public class TaskLogsTruncater {
    * @param firstAttempt
    * @param updatedTaskLogFileDetails
    */
-  private void updateIndicesAfterLogTruncation(TaskAttemptID firstAttempt,
+  private void updateIndicesAfterLogTruncation(String location,
       Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) {
     for (Entry<Task, Map<LogName, LogFileDetail>> entry : 
                                 updatedTaskLogFileDetails.entrySet()) {
@@ -383,7 +382,7 @@ public class TaskLogsTruncater {
         }
       }
       try {
-        TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(),
+        TaskLog.writeToIndexFile(location, task.getTaskID(),
             task.isTaskCleanupTask(), logLengths);
       } catch (IOException ioe) {
         LOG.warn("Exception encountered while updating index file of task "

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 04:12:53 2011
@@ -202,7 +202,7 @@ abstract class TaskRunner extends Thread
       // set memory limit using ulimit if feasible and necessary ...
       List<String> setup = getVMSetupCmd();
       // Set up the redirection of the task's stdout and stderr streams
-      File[] logFiles = prepareLogFiles(taskid);
+      File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
       File stdout = logFiles[0];
       File stderr = logFiles[1];
       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
@@ -267,13 +267,17 @@ abstract class TaskRunner extends Thread
    * Prepare the log files for the task
    * 
    * @param taskid
+   * @param isCleanup
    * @return an array of files. The first file is stdout, the second is stderr.
    * @throws IOException 
    */
-  File[] prepareLogFiles(TaskAttemptID taskid) throws IOException {
+  File[] prepareLogFiles(TaskAttemptID taskid, boolean isCleanup)
+      throws IOException {
     File[] logFiles = new File[2];
-    logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
-    logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    logFiles[0] = TaskLog.getTaskLogFile(taskid, isCleanup,
+        TaskLog.LogName.STDOUT);
+    logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup,
+        TaskLog.LogName.STDERR);
     File logDir = logFiles[0].getParentFile();
     boolean b = logDir.mkdirs();
     if (!b) {
@@ -437,17 +441,13 @@ abstract class TaskRunner extends Thread
     vargs.add(classPath);
 
     // Setup the log4j prop
-    vargs.add("-Dhadoop.log.dir=" + 
-        new File(System.getProperty("hadoop.log.dir")
-        ).getAbsolutePath());
-    vargs.add("-Dhadoop.root.logger=INFO,TLA");
-    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
-    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+    setupLog4jProperties(vargs, taskid, logSize);
 
     if (conf.getProfileEnabled()) {
       if (conf.getProfileTaskRange(t.isMapTask()
                                    ).isIncluded(t.getPartition())) {
-        File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+        File prof = TaskLog.getTaskLogFile(taskid, t.isTaskCleanupTask(),
+            TaskLog.LogName.PROFILE);
         vargs.add(String.format(conf.getProfileParams(), prof.toString()));
       }
     }
@@ -459,9 +459,21 @@ abstract class TaskRunner extends Thread
     vargs.add(address.getAddress().getHostAddress()); 
     vargs.add(Integer.toString(address.getPort())); 
     vargs.add(taskid.toString());                      // pass task identifier
+    // pass task log location
+    vargs.add(TaskLog.getAttemptDir(taskid, t.isTaskCleanupTask()).toString());
     return vargs;
   }
 
+  private void setupLog4jProperties(Vector<String> vargs, TaskAttemptID taskid,
+      long logSize) {
+    vargs.add("-Dhadoop.log.dir=" + 
+        new File(System.getProperty("hadoop.log.dir")).getAbsolutePath());
+    vargs.add("-Dhadoop.root.logger=INFO,TLA");
+    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+    vargs.add("-Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask());
+    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+  }
+
   /**
    * @param taskid
    * @param workDir
@@ -542,6 +554,7 @@ abstract class TaskRunner extends Thread
       hadoopClientOpts = hadoopClientOpts + " ";
     }
     hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+                       + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask()
                        + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
     env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 04:12:53 2011
@@ -2580,8 +2580,9 @@ public class TaskTracker 
               String taskSyslog ="";
               String jobConf = task.getJobFile();
               try {
-                Map<LogName, LogFileDetail> allFilesDetails =
-                    TaskLog.getAllLogsFileDetails(task.getTaskID(), false);
+                Map<LogName, LogFileDetail> allFilesDetails = TaskLog
+                    .getAllLogsFileDetails(task.getTaskID(), task
+                        .isTaskCleanupTask());
                 // get task's stdout file
                 taskStdout =
                     TaskLog.getRealTaskLogFilePath(
@@ -2615,8 +2616,8 @@ public class TaskTracker 
                           StringUtils.stringifyException(e));
               }
               // Build the command  
-              File stdout = TaskLog.getRealTaskLogFileLocation(
-                                   task.getTaskID(), TaskLog.LogName.DEBUGOUT);
+              File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task
+                  .isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT);
               // add pipes program as argument if it exists.
               String program ="";
               String executable = Submitter.getExecutable(localJobConf);
@@ -2661,8 +2662,9 @@ public class TaskTracker 
 
               // Debug-command is run. Do the post-debug-script-exit debug-logs
               // processing. Truncate the logs.
-              JvmFinishedEvent jvmFinished = new JvmFinishedEvent(
-                  new JVMInfo(task.getTaskID(), Arrays.asList(task)));
+              JvmFinishedEvent jvmFinished = new JvmFinishedEvent(new JVMInfo(
+                  TaskLog.getAttemptDir(task.getTaskID(), task
+                      .isTaskCleanupTask()), Arrays.asList(task)));
               getUserLogManager().addLogEvent(jvmFinished);
             }
           }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Fri Mar  4 04:12:53 2011
@@ -97,8 +97,10 @@ class Application<K1 extends WritableCom
     cmd.add(executable);
     // wrap the command in a stdout/stderr capture
     TaskAttemptID taskid = TaskAttemptID.forName(conf.get("mapred.task.id"));
-    File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
-    File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    // we are starting map/reduce task of the pipes job. this is not a cleanup
+    // attempt. 
+    File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
+    File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
     cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
         false);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/JVMInfo.java Fri Mar  4 04:12:53 2011
@@ -17,22 +17,22 @@
  */
 package org.apache.hadoop.mapreduce.server.tasktracker;
 
+import java.io.File;
 import java.util.List;
 
 import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 public class JVMInfo {
-  private TaskAttemptID firstAttemptID; 
+  private File logLocation; 
   private List<Task> allAttempts;
 
-  public JVMInfo(TaskAttemptID firstAttemptID, List<Task> allAttempts) {
-    this.firstAttemptID = firstAttemptID;
+  public JVMInfo(File logLocation, List<Task> allAttempts) {
+    this.logLocation = logLocation;
     this.allAttempts = allAttempts;
   }
   
-  public TaskAttemptID getFirstAttemptID() {
-    return firstAttemptID;
+  public File getLogLocation() {
+    return logLocation;
   }
 
   public List<Task> getAllAttempts() {

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmReuse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmReuse.java?rev=1077418&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmReuse.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmReuse.java Fri Mar  4 04:12:53 2011
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class TestJvmReuse extends TestCase {
+  private static Path rootDir = new Path(System.getProperty("test.build.data",
+      "/tmp"), TestJvmReuse.class.getName());
+  private int numMappers = 5;
+  private static int taskWithCleanup = 2; // third task
+
+  /**
+   * A mapper class in which all attempts log taskid. Zeroth attempt of task
+   * with id=taskWithCleanup, fails with System.exit to force a cleanup attempt
+   * for the task in a new jvm.
+   */
+  public static class MapperClass extends MapReduceBase implements
+      Mapper<LongWritable, Text, Text, IntWritable> {
+    String taskid;
+    static int instances = 0;
+    Reporter reporter = null;
+
+    public void configure(JobConf job) {
+      taskid = job.get("mapred.task.id");
+    }
+
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      System.err.println(taskid);
+      this.reporter = reporter;
+
+      if (TaskAttemptID.forName(taskid).getTaskID().getId() == taskWithCleanup) {
+        if (taskid.endsWith("_0")) {
+          System.exit(-1);
+        }
+      }
+    }
+    
+    public void close() throws IOException {
+      reporter.incrCounter("jvm", "use", ++instances);
+    }
+  }
+
+  public RunningJob launchJob(JobConf conf, Path inDir, Path outDir)
+      throws IOException {
+    // set up the input file system and write input text.
+    FileSystem inFs = inDir.getFileSystem(conf);
+    FileSystem outFs = outDir.getFileSystem(conf);
+    outFs.delete(outDir, true);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    for (int i = 0; i < numMappers; i++) {
+      // write input into input file
+      DataOutputStream file = inFs.create(new Path(inDir, "part-" + i));
+      file.writeBytes("input");
+      file.close();
+    }
+
+    // configure the mapred Job
+    conf.setMapperClass(MapperClass.class);
+    conf.setNumReduceTasks(0);
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    // enable jvm reuse
+    conf.setNumTasksToExecutePerJvm(-1);
+    // return the RunningJob handle.
+    return new JobClient(conf).submitJob(conf);
+  }
+
+  private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
+      TaskStatus ts, boolean isCleanup) throws IOException {
+    assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
+    // validate tasklogs for task attempt
+    String log = TestMiniMRMapRedDebugScript.readTaskLog(
+        TaskLog.LogName.STDERR, attemptId, false);
+    assertTrue(log.equals(attemptId.toString()));
+    assertTrue(ts != null);
+    if (!isCleanup) {
+      assertEquals(TaskStatus.State.SUCCEEDED, ts.getRunState());
+    } else {
+      assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+      // validate tasklogs for cleanup attempt
+      log = TestMiniMRMapRedDebugScript.readTaskLog(TaskLog.LogName.STDERR,
+          attemptId, true);
+      assertTrue(log.equals(TestTaskFail.cleanupLog));
+    }
+  }
+
+  // validates logs of all attempts of the job.
+  private void validateJob(RunningJob job, MiniMRCluster mr) throws IOException {
+    assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+    long uses = job.getCounters().findCounter("jvm", "use").getValue();
+    System.out.println("maps:" + numMappers + " uses:" + uses);
+    assertTrue("maps = " + numMappers + ", jvms = " + uses, numMappers < uses);
+
+    JobID jobId = job.getID();
+
+    for (int i = 0; i < numMappers; i++) {
+      TaskAttemptID attemptId = new TaskAttemptID(new TaskID(jobId, true, i), 0);
+      TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().getTip(
+          attemptId.getTaskID());
+      TaskStatus ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(
+          attemptId);
+      validateAttempt(tip, attemptId, ts, i == taskWithCleanup);
+      if (i == taskWithCleanup) {
+        // validate second attempt of the task
+        attemptId = new TaskAttemptID(new TaskID(jobId, true, i), 1);
+        ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+        validateAttempt(tip, attemptId, ts, false);
+      }
+    }
+  }
+
+  public void testTaskLogs() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      final int taskTrackers = 1; // taskTrackers should be 1 to test jvm reuse.
+      conf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+      mr = new MiniMRCluster(taskTrackers, "file:///", 1);
+
+      final Path inDir = new Path(rootDir, "input");
+      final Path outDir = new Path(rootDir, "output");
+      JobConf jobConf = mr.createJobConf();
+      jobConf.setOutputCommitter(TestTaskFail.CommitterWithLogs.class);
+      RunningJob rJob = launchJob(jobConf, inDir, outDir);
+      rJob.waitForCompletion();
+      validateJob(rJob, mr);
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskFail.java Fri Mar  4 04:12:53 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 
 import junit.framework.TestCase;
 
@@ -33,7 +34,7 @@ import org.apache.hadoop.mapred.lib.Iden
 
 public class TestTaskFail extends TestCase {
   private static String taskLog = "Task attempt log";
-  private static String cleanupLog = "cleanup attempt log";
+  static String cleanupLog = "cleanup attempt log";
 
   public static class MapperClass extends MapReduceBase
   implements Mapper<LongWritable, Text, Text, IntWritable> {
@@ -45,6 +46,8 @@ public class TestTaskFail extends TestCa
                      OutputCollector<Text, IntWritable> output, 
                      Reporter reporter) throws IOException {
       System.err.println(taskLog);
+      assertFalse(Boolean.getBoolean(System
+          .getProperty("hadoop.tasklog.iscleanup")));
       if (taskid.endsWith("_0")) {
         throw new IOException();
       } else if (taskid.endsWith("_1")) {
@@ -58,6 +61,15 @@ public class TestTaskFail extends TestCa
   static class CommitterWithLogs extends FileOutputCommitter {
     public void abortTask(TaskAttemptContext context) throws IOException {
       System.err.println(cleanupLog);
+      String attemptId = System.getProperty("hadoop.tasklog.taskid");
+      assertNotNull(attemptId);
+      if (attemptId.endsWith("_0")) {
+        assertFalse(Boolean.getBoolean(System
+            .getProperty("hadoop.tasklog.iscleanup")));
+      } else {
+        assertTrue(Boolean.getBoolean(System
+            .getProperty("hadoop.tasklog.iscleanup")));
+      }
       super.abortTask(context);
     }
   }
@@ -110,7 +122,7 @@ public class TestTaskFail extends TestCa
   }
   
   private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, 
-		  TaskStatus ts, boolean isCleanup) 
+		  TaskStatus ts, boolean isCleanup, JobTracker jt) 
   throws IOException {
     assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
     assertTrue(ts != null);
@@ -119,6 +131,13 @@ public class TestTaskFail extends TestCa
     String log = TestMiniMRMapRedDebugScript.readTaskLog(
     TaskLog.LogName.STDERR, attemptId, false);
     assertTrue(log.contains(taskLog));
+    // access the logs from web url
+    TaskTrackerStatus ttStatus = jt.getTaskTracker(
+        tip.machineWhereTaskRan(attemptId)).getStatus();
+    String tasklogUrl = TaskLogServlet.getTaskLogUrl("localhost",
+        String.valueOf(ttStatus.getHttpPort()), attemptId.toString());
+    assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
+        .getHttpStatusCode(tasklogUrl, tip.getUser(), "GET"));
     if (!isCleanup) {
       // validate task logs: tasklog should contain both task logs
       // and cleanup logs
@@ -128,10 +147,18 @@ public class TestTaskFail extends TestCa
       log = TestMiniMRMapRedDebugScript.readTaskLog(
       TaskLog.LogName.STDERR, attemptId, true);
       assertTrue(log.contains(cleanupLog));
+      // access the cleanup attempt's logs from web url
+      ttStatus = jt.getTaskTracker(tip.machineWhereCleanupRan(attemptId))
+          .getStatus();
+      String cleanupTasklogUrl = TaskLogServlet.getTaskLogUrl(
+          "localhost", String.valueOf(ttStatus.getHttpPort()), attemptId
+              .toString()) + "&cleanup=true";
+      assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
+          .getHttpStatusCode(cleanupTasklogUrl, tip.getUser(), "GET"));
     }
   }
 
-  private void validateJob(RunningJob job, MiniMRCluster mr) 
+  private void validateJob(RunningJob job, JobTracker jt) 
   throws IOException {
     assertEquals(JobStatus.SUCCEEDED, job.getJobState());
 	    
@@ -141,23 +168,21 @@ public class TestTaskFail extends TestCa
     // fails with an exception
     TaskAttemptID attemptId = 
       new TaskAttemptID(new TaskID(jobId, true, 0), 0);
-    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
-                            getTip(attemptId.getTaskID());
-    TaskStatus ts = 
-      mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, false);
+    TaskInProgress tip = jt.getTip(attemptId.getTaskID());
+    TaskStatus ts = jt.getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, false, jt);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
     // this should be cleanup attempt since the second attempt fails
     // with System.exit
-    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, true);
+    ts = jt.getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, true, jt);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 2);
     // this should be cleanup attempt since the third attempt fails
     // with Error
-    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, true);
+    ts = jt.getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, true, jt);
   }
   
   public void testWithDFS() throws IOException {
@@ -171,6 +196,7 @@ public class TestTaskFail extends TestCa
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
       final Path inDir = new Path("./input");
       final Path outDir = new Path("./output");
       String input = "The quick brown fox\nhas many silly\nred fox sox\n";
@@ -179,18 +205,18 @@ public class TestTaskFail extends TestCa
       jobConf.setOutputCommitter(CommitterWithLogs.class);
       RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, mr);
+      validateJob(rJob, jt);
       // launch job with fail tasks and fail-cleanups
       fileSys.delete(outDir, true);
       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
       rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, mr);
+      validateJob(rJob, jt);
       fileSys.delete(outDir, true);
       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
       rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, mr);
+      validateJob(rJob, jt);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsTruncater.java Fri Mar  4 04:12:53 2011
@@ -78,32 +78,33 @@ public class TestTaskLogsTruncater {
       TaskAttemptID attemptID, LogName logName, long numBytes, char data)
       throws IOException {
 
-    File logFile = TaskLog.getTaskLogFile(firstAttemptID, logName);
+    File logFile = TaskLog.getTaskLogFile(firstAttemptID, false, logName);
+    File logLocation = logFile.getParentFile();
 
     LOG.info("Going to write " + numBytes + " real bytes to the log file "
         + logFile);
 
-    if (!logFile.getParentFile().exists()
-        && !logFile.getParentFile().mkdirs()) {
+    if (!logLocation.exists()
+        && !logLocation.mkdirs()) {
       throw new IOException("Couldn't create all ancestor dirs for "
           + logFile);
     }
 
-    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID, false);
     if (!attemptDir.exists() && !attemptDir.mkdirs()) {
       throw new IOException("Couldn't create all ancestor dirs for "
           + logFile);
     }
 
     // Need to call up front to set currenttaskid.
-    TaskLog.syncLogs(firstAttemptID, attemptID);
+    TaskLog.syncLogs(logLocation.toString(), attemptID, false);
 
     FileWriter writer = new FileWriter(logFile, true);
     for (long i = 0; i < numBytes; i++) {
       writer.write(data);
     }
     writer.close();
-    TaskLog.syncLogs(firstAttemptID, attemptID);
+    TaskLog.syncLogs(logLocation.toString(), attemptID, false);
     LOG.info("Written " + numBytes + " real bytes to the log file "
         + logFile);
   }
@@ -114,7 +115,7 @@ public class TestTaskLogsTruncater {
 
     // If the index file doesn't exist, we cannot get log-file lengths. So set
     // them to zero.
-    if (!TaskLog.getIndexFile(tid.toString(), isCleanup).exists()) {
+    if (!TaskLog.getIndexFile(tid, isCleanup).exists()) {
       for (LogName log : LogName.values()) {
         allLogsFileLengths.put(log, Long.valueOf(0));
       }
@@ -159,16 +160,16 @@ public class TestTaskLogsTruncater {
     for (LogName log : LogName.values()) {
       writeRealBytes(attemptID, attemptID, log, 500, 'H');
     }
-    File logIndex = TaskLog.getIndexFile(attemptID.toString(), false);
+    File logIndex = TaskLog.getIndexFile(attemptID, false);
     long indexModificationTimeStamp = logIndex.lastModified();
 
-    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID, false);
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
     assertEquals("index file got modified", indexModificationTimeStamp,
         logIndex.lastModified());
 
     // Finish the task and the JVM too.
-    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptDir, Arrays.asList(task));
     logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // There should be no truncation of the log-file.
@@ -178,7 +179,7 @@ public class TestTaskLogsTruncater {
 
     Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
     for (LogName log : LogName.values()) {
-      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      File logFile = TaskLog.getTaskLogFile(attemptID, false, log);
       assertEquals(500, logFile.length());
       // The index file should also be proper.
       assertEquals(500, logLengths.get(log).longValue());
@@ -191,7 +192,7 @@ public class TestTaskLogsTruncater {
     
     logLengths = getAllLogsFileLengths(attemptID, false);
     for (LogName log : LogName.values()) {
-      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      File logFile = TaskLog.getTaskLogFile(attemptID, false, log);
       assertEquals(500, logFile.length());
       // The index file should also be proper.
       assertEquals(500, logLengths.get(log).longValue());
@@ -221,18 +222,18 @@ public class TestTaskLogsTruncater {
       writeRealBytes(attemptID, attemptID, log, 1500, 'H');
     }
 
-    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID, false);
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Finish the task and the JVM too.
-    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptDir, Arrays.asList(task));
     logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should not be truncated.
     assertTrue(attemptDir.exists());
     Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
     for (LogName log : LogName.values()) {
-      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      File logFile = TaskLog.getTaskLogFile(attemptID, false, log);
       assertEquals(1500, logFile.length());
       // The index file should also be proper.
       assertEquals(1500, logLengths.get(log).longValue());
@@ -261,11 +262,11 @@ public class TestTaskLogsTruncater {
       writeRealBytes(attemptID, attemptID, log, 1500, 'H');
     }
 
-    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID, false);
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Finish the task and the JVM too.
-    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptDir, Arrays.asList(task));
     logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should now be truncated.
@@ -273,7 +274,7 @@ public class TestTaskLogsTruncater {
 
     Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
     for (LogName log : LogName.values()) {
-      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      File logFile = TaskLog.getTaskLogFile(attemptID, false, log);
       assertEquals(1000, logFile.length());
       // The index file should also be proper.
       assertEquals(1000, logLengths.get(log).longValue());
@@ -282,7 +283,7 @@ public class TestTaskLogsTruncater {
     // truncate once again
     logLengths = getAllLogsFileLengths(attemptID, false);
     for (LogName log : LogName.values()) {
-      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      File logFile = TaskLog.getTaskLogFile(attemptID, false, log);
       assertEquals(1000, logFile.length());
       // The index file should also be proper.
       assertEquals(1000, logLengths.get(log).longValue());
@@ -312,22 +313,22 @@ public class TestTaskLogsTruncater {
     writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
     writeRealBytes(attemptID, attemptID, LogName.STDERR, 500, 'H');
 
-    File attemptDir = TaskLog.getAttemptDir(attemptID.toString());
+    File attemptDir = TaskLog.getAttemptDir(attemptID, false);
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Finish the task and the JVM too.
-    JVMInfo jvmInfo = new JVMInfo(attemptID, Arrays.asList(task));
+    JVMInfo jvmInfo = new JVMInfo(attemptDir, Arrays.asList(task));
     logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should now be truncated.
     assertTrue(attemptDir.exists());
 
     Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
-    File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+    File logFile = TaskLog.getTaskLogFile(attemptID, false, LogName.SYSLOG);
     assertEquals(1000, logFile.length());
     // The index file should also be proper.
     assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
-    logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+    logFile = TaskLog.getTaskLogFile(attemptID, false, LogName.STDERR);
     assertEquals(500, logFile.length());
     // The index file should also be proper.
     assertEquals(500, logLengths.get(LogName.STDERR).longValue());
@@ -335,11 +336,11 @@ public class TestTaskLogsTruncater {
     // truncate once again
     logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
     logLengths = getAllLogsFileLengths(attemptID, false);
-    logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+    logFile = TaskLog.getTaskLogFile(attemptID, false, LogName.SYSLOG);
     assertEquals(1000, logFile.length());
     // The index file should also be proper.
     assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
-    logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+    logFile = TaskLog.getTaskLogFile(attemptID, false, LogName.STDERR);
     assertEquals(500, logFile.length());
     // The index file should also be proper.
     assertEquals(500, logLengths.get(LogName.STDERR).longValue());
@@ -366,7 +367,7 @@ public class TestTaskLogsTruncater {
     // Let the tasks write logs more than retain-size
     writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
 
-    File attemptDir = TaskLog.getAttemptDir(attempt1.toString());
+    File attemptDir = TaskLog.getAttemptDir(attempt1, false);
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
 
     // Start another attempt in the same JVM
@@ -382,13 +383,13 @@ public class TestTaskLogsTruncater {
     // Let attempt3 also write some logs
     writeRealBytes(attempt1, attempt3, LogName.SYSLOG, 225, 'C');
     // Finish the JVM.
-    JVMInfo jvmInfo = new JVMInfo(attempt1, Arrays.asList((new Task[] { task1,
-        task2, task3 })));
+    JVMInfo jvmInfo = new JVMInfo(attemptDir, 
+        Arrays.asList((new Task[] { task1, task2, task3 })));
     logManager.addLogEvent(new JvmFinishedEvent(jvmInfo));
 
     // The log-file should now be truncated.
     assertTrue(attemptDir.exists());
-    File logFile = TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG);
+    File logFile = TaskLog.getTaskLogFile(attempt1, false, LogName.SYSLOG);
     assertEquals(400, logFile.length());
     // The index files should also be proper.
     assertEquals(150, getAllLogsFileLengths(attempt1, false).get(
@@ -400,7 +401,7 @@ public class TestTaskLogsTruncater {
 
     // assert the data.
     FileReader reader =
-        new FileReader(TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG));
+        new FileReader(TaskLog.getTaskLogFile(attempt1, false, LogName.SYSLOG));
     int ch, bytesRead = 0;
     boolean dataValid = true;
     while ((ch = reader.read()) != -1) {
@@ -496,7 +497,7 @@ public class TestTaskLogsTruncater {
       assertTrue(job.getJobState() == JobStatus.SUCCEEDED);
       for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
         long length =
-            TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+            TaskLog.getTaskLogFile(tce.getTaskAttemptId(), false,
                 TaskLog.LogName.STDOUT).length();
         assertTrue("STDOUT log file length for " + tce.getTaskAttemptId()
             + " is " + length + " and not <=10000", length <= 10000);
@@ -586,7 +587,7 @@ public class TestTaskLogsTruncater {
       } finally{
         for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
           File debugOutFile =
-              TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+              TaskLog.getTaskLogFile(tce.getTaskAttemptId(), false,
                   TaskLog.LogName.DEBUGOUT);
           if (debugOutFile.exists()) {
             long length = debugOutFile.length();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 04:12:53 2011
@@ -507,7 +507,7 @@ public class TestTaskTrackerLocalization
         .getPath(), "tmp").exists());
 
     // Make sure that the logs are setup properly
-    File logDir = TaskLog.getAttemptDir(taskId.toString());
+    File logDir = TaskLog.getAttemptDir(taskId, task.isTaskCleanupTask());
     assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
         logDir.exists());
     checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -536,8 +536,8 @@ public class TestTaskTrackerLocalization
         taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
 
     // Validate the contents of jobACLsFile(both user name and job-view-acls)
-    Configuration jobACLsConf =
-        TaskLogServlet.getConfFromJobACLsFile(task.getTaskID().toString());
+    Configuration jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(
+        task.getTaskID(), task.isTaskCleanupTask());
     assertTrue(jobACLsConf.get("user.name").equals(
         localizedJobConf.getUser()));
     assertTrue(jobACLsConf.get(JobContext.JOB_ACL_VIEW_JOB).
@@ -576,7 +576,7 @@ public class TestTaskTrackerLocalization
    * $taskid/work
    * Also see createFileAndSetPermissions for details
    */
-  void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
+  void validateRemoveTaskFiles(boolean needCleanup, boolean jvmReuse,
                            TaskInProgress tip) throws IOException {
     // create files and set permissions 555. Verify if task controller sets
     // the permissions for TT to delete the taskDir or workDir
@@ -614,7 +614,6 @@ public class TestTaskTrackerLocalization
       // now try to delete the work dir and verify that there are no stale paths
       JvmManager.deleteWorkDir(tracker, task);
     }
-    tracker.removeJobFiles(task.getUser(), jobId);
 
     assertTrue("Some task files are not deleted!! Number of stale paths is "
         + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
@@ -624,36 +623,45 @@ public class TestTaskTrackerLocalization
    * Validates if task cleanup is done properly for a succeeded task
    * @throws IOException
    */
-  public void testTaskCleanup()
+  public void testTaskFilesRemoval()
       throws Exception {
     if (!canRun()) {
       return;
     }
-    testTaskCleanup(false, false);// no needCleanup; no jvmReuse
+    testTaskFilesRemoval(false, false);// no needCleanup; no jvmReuse
   }
 
   /**
    * Validates if task cleanup is done properly for a task that is not succeeded
    * @throws IOException
    */
-  public void testFailedTaskCleanup()
+  public void testFailedTaskFilesRemoval()
   throws Exception {
     if (!canRun()) {
       return;
     }
-    testTaskCleanup(true, false);// needCleanup; no jvmReuse
+    testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
+
+    // initialize a cleanupAttempt for the task.
+    task.setTaskCleanupTask();
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+
+    // verify the cleanup of cleanup attempt.
+    testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
   }
 
   /**
    * Validates if task cleanup is done properly for a succeeded task
    * @throws IOException
    */
-  public void testTaskCleanupWithJvmUse()
+  public void testTaskFilesRemovalWithJvmUse()
       throws Exception {
     if (!canRun()) {
       return;
     }
-    testTaskCleanup(false, true);// no needCleanup; jvmReuse
+    testTaskFilesRemoval(false, true);// no needCleanup; jvmReuse
   }
 
   private void initializeTask() throws IOException {
@@ -687,7 +695,8 @@ public class TestTaskTrackerLocalization
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
         localizedJobConf);
-    attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
+    attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
+        task.isTaskCleanupTask());
 
     // Make sure the task-conf file is created
     Path localTaskFile =
@@ -718,7 +727,7 @@ public class TestTaskTrackerLocalization
   /**
    * Validates if task cleanup is done properly
    */
-  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+  private void testTaskFilesRemoval(boolean needCleanup, boolean jvmReuse)
       throws Exception {
     // Localize job and localize task.
     TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
@@ -732,19 +741,7 @@ public class TestTaskTrackerLocalization
 
     // create files and set permissions 555. Verify if task controller sets
     // the permissions for TT to delete the task dir or work dir properly
-    validateRemoveFiles(needCleanup, jvmReuse, tip);
-
-    // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
-    // there.
-    for (String localDir : localDirs) {
-      Path userDir =
-          new Path(localDir, TaskTracker.getUserDir(task.getUser()));
-      assertTrue("User directory " + userDir + " is not present!!",
-          tracker.getLocalFileSystem().exists(userDir));
-    }
-
-    // Test userlogs cleanup.
-    verifyUserLogsCleanup();
+    validateRemoveTaskFiles(needCleanup, jvmReuse, tip);
   }
 
   /**
@@ -752,7 +749,7 @@ public class TestTaskTrackerLocalization
    * 
    * @throws IOException
    */
-  private void verifyUserLogsCleanup()
+  private void verifyUserLogsRemoval()
       throws IOException {
     // verify user logs cleanup
     File jobUserLogDir = TaskLog.getJobDir(jobId);
@@ -772,7 +769,7 @@ public class TestTaskTrackerLocalization
    *   - create files with no write permissions to TT under job-work-dir
    *   - create files with no write permissions to TT under task-work-dir
    */
-  public void testJobCleanup() throws IOException, InterruptedException {
+  public void testJobFilesRemoval() throws IOException, InterruptedException {
     if (!canRun()) {
       return;
     }
@@ -838,6 +835,17 @@ public class TestTaskTrackerLocalization
     }
     assertFalse("Job " + task.getJobID() + " work dir exists after cleanup", 
                 jWorkDirExists);
+    // Test userlogs cleanup.
+    verifyUserLogsRemoval();
+
+    // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
+    // there.
+    for (String localDir : localDirs) {
+      Path userDir =
+          new Path(localDir, TaskTracker.getUserDir(task.getUser()));
+      assertTrue("User directory " + userDir + " is not present!!",
+          tracker.getLocalFileSystem().exists(userDir));
+    }
   }
   
   /**
@@ -922,4 +930,30 @@ public class TestTaskTrackerLocalization
     checkTaskLocalization();
   }
 
+  /**
+   * Localizes a cleanup task and validates permissions.
+   * 
+   * @throws InterruptedException 
+   * @throws IOException 
+   */
+  public void testCleanupTaskLocalization() throws IOException,
+      InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+
+    task.setTaskCleanupTask();
+    // register task
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+
+    // localize the job again.
+    RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    checkJobLocalization();
+
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java?rev=1077418&r1=1077417&r2=1077418&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java Fri Mar  4 04:12:53 2011
@@ -370,8 +370,10 @@ public class TestWebUIAuthorization exte
 
         // delete job-acls.xml file from the task log dir of attempt and verify
         // if unauthorized users can view task logs of attempt.
-        Path jobACLsFilePath = new Path(TaskLog.getAttemptDir(attempt).
-            toString(), TaskRunner.jobACLsFile);
+        File attemptLogDir = TaskLog.getAttemptDir(TaskAttemptID
+            .forName(attempt), false);
+        Path jobACLsFilePath = new Path(attemptLogDir.toString(),
+            TaskRunner.jobACLsFile);
         new File(jobACLsFilePath.toUri().getPath()).delete();
         assertEquals("Incorrect return code for " + unauthorizedUser,
             HttpURLConnection.HTTP_OK, getHttpStatusCode(taskLogURL,
@@ -379,7 +381,7 @@ public class TestWebUIAuthorization exte
 
         // delete the whole task log dir of attempt and verify that we get
         // correct response code (i.e. HTTP_GONE) when task logs are accessed.
-        FileUtil.fullyDelete(TaskLog.getAttemptDir(attempt));
+        FileUtil.fullyDelete(attemptLogDir);
         assertEquals("Incorrect return code for " + jobSubmitter,
             HttpURLConnection.HTTP_GONE, getHttpStatusCode(taskLogURL,
                 jobSubmitter, "GET"));