You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/05/11 16:32:51 UTC

svn commit: r943124 - in /hadoop/mapreduce/branches/branch-0.21: ./ conf/ src/c++/task-controller/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/pipes/ src/test/mapred/o...

Author: vinodkv
Date: Tue May 11 14:32:50 2010
New Revision: 943124

URL: http://svn.apache.org/viewvc?rev=943124&view=rev
Log:
MAPREDUCE-1607. Merge revision 943039 from trunk.

Added:
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java
      - copied unchanged from r943039, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJvmReuse.java
Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties
    hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Tue May 11 14:32:50 2010
@@ -52,6 +52,9 @@ Release 0.21.0 - Unreleased
 
     MAPREDUCE-1644. Remove Sqoop contrib module. (Aaron Kimball via cdouglas)
 
+    MAPREDUCE-1607. Task controller may not set permissions for a
+    task cleanup attempt's log directory (Amareshwari Sriramadasu via vinodkv)
+
   NEW FEATURES
 
     MAPREDUCE-706. Support for FIFO pools in the fair scheduler.

Modified: hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties (original)
+++ hadoop/mapreduce/branches/branch-0.21/conf/log4j.properties Tue May 11 14:32:50 2010
@@ -55,6 +55,7 @@ log4j.appender.console.layout.Conversion
 
 #Default values
 hadoop.tasklog.taskid=null
+hadoop.tasklog.iscleanup=false
 hadoop.tasklog.noKeepSplits=4
 hadoop.tasklog.totalLogFileSize=100
 hadoop.tasklog.purgeLogSplits=true
@@ -62,6 +63,7 @@ hadoop.tasklog.logsRetainHours=12
 
 log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
 log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
 log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
 
 log4j.appender.TLA.layout=org.apache.log4j.PatternLayout

Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/task-controller.c?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c Tue May 11 14:32:50 2010
@@ -583,10 +583,6 @@ int prepare_task_logs(const char *log_di
   if (stat(task_log_dir, &filestat) != 0) {
     if (errno == ENOENT) {
       // See TaskRunner.java to see that an absent log-dir doesn't fail the task.
-      // Task log dir for cleanup tasks will not have the name
-      // task-attempt-id.cleanup. Instead a log.index.cleanup is created in
-      // task-attempt log dir. We check if the directory exists and return if
-      // it doesn't. So the following will work for cleanup attempts too.
 #ifdef DEBUG
       fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
           task_log_dir);

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Tue May 11 14:32:50 2010
@@ -135,6 +135,7 @@ public class TestStreamingTaskLog {
     long logSize = USERLOG_LIMIT_KB * 1024;
     assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
                && env.contains("-Dhadoop.tasklog.taskid=attempt_")
-               && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize));
+               && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize)
+               && env.contains("-Dhadoop.tasklog.iscleanup=false"));
   }
 }

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/Child.java Tue May 11 14:32:50 2010
@@ -69,8 +69,9 @@ class Child {
     int port = Integer.parseInt(args[1]);
     final InetSocketAddress address = new InetSocketAddress(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+    final String logLocation = args[3];
     final int SLEEP_LONGER_COUNT = 5;
-    int jvmIdInt = Integer.parseInt(args[3]);
+    int jvmIdInt = Integer.parseInt(args[4]);
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),
         firstTaskid.getTaskType() == TaskType.MAP,jvmIdInt);
     
@@ -109,7 +110,7 @@ class Child {
       public void run() {
         try {
           if (taskid != null) {
-            TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+            TaskLog.syncLogs(logLocation, taskid, isCleanup);
           }
         } catch (Throwable throwable) {
         }
@@ -123,7 +124,7 @@ class Child {
           try {
             Thread.sleep(5000);
             if (taskid != null) {
-              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+              TaskLog.syncLogs(logLocation, taskid, isCleanup);
             }
           } catch (InterruptedException ie) {
           } catch (IOException iee) {
@@ -175,7 +176,7 @@ class Child {
 
         //create the index file so that the log files 
         //are viewable immediately
-        TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+        TaskLog.syncLogs(logLocation, taskid, isCleanup);
         final JobConf job = new JobConf(task.getJobFile());
         
         // set the jobTokenFile into task
@@ -215,7 +216,7 @@ class Child {
               FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
               taskFinal.run(job, umbilical);             // run the task
             } finally {
-              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+              TaskLog.syncLogs(logLocation, taskid, isCleanup);
             }
             
             return null;

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java Tue May 11 14:32:50 2010
@@ -72,19 +72,21 @@ public class TaskLog {
     }
   }
 
-  public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(getAttemptDir(taskid.toString()), filter.toString());
-  }
-  public static File getRealTaskLogFileLocation(TaskAttemptID taskid, 
+  public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
       LogName filter) {
+    return new File(getAttemptDir(taskid, isCleanup), filter.toString());
+  }
+
+  static File getRealTaskLogFileLocation(TaskAttemptID taskid,
+      boolean isCleanup, LogName filter) {
     LogFileDetail l;
     try {
-      l = getTaskLogFileDetail(taskid, filter);
+      l = getLogFileDetail(taskid, filter, isCleanup);
     } catch (IOException ie) {
       LOG.error("getTaskLogFileDetail threw an exception " + ie);
       return null;
     }
-    return new File(getAttemptDir(l.location), filter.toString());
+    return new File(l.location, filter.toString());
   }
   private static class LogFileDetail {
     final static String LOCATION = "LOG_DIR:";
@@ -93,16 +95,11 @@ public class TaskLog {
     long length;
   }
   
-  private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
-      LogName filter) throws IOException {
-    return getLogFileDetail(taskid, filter, false);
-  }
-  
   private static LogFileDetail getLogFileDetail(TaskAttemptID taskid, 
                                                 LogName filter,
                                                 boolean isCleanup) 
   throws IOException {
-    File indexFile = getIndexFile(taskid.toString(), isCleanup);
+    File indexFile = getIndexFile(taskid, isCleanup);
     BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
@@ -120,7 +117,7 @@ public class TaskLog {
     //to be associated with each task attempt since jvm reuse is disabled
     //when profiling/debugging is enabled
     if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
-      l.length = new File(getAttemptDir(l.location), filter.toString()).length();
+      l.length = new File(l.location, filter.toString()).length();
       l.start = 0;
       fis.close();
       return l;
@@ -141,39 +138,32 @@ public class TaskLog {
     return l;
   }
   
-  private static File getTmpIndexFile(String taskid) {
-    return new File(getAttemptDir(taskid), "log.tmp");
-  }
-  public static File getIndexFile(String taskid) {
-    return getIndexFile(taskid, false);
+  private static File getTmpIndexFile(TaskAttemptID taskid, boolean isCleanup) {
+    return new File(getAttemptDir(taskid, isCleanup), "log.tmp");
   }
-  
-  public static File getIndexFile(String taskid, boolean isCleanup) {
-    if (isCleanup) {
-      return new File(getAttemptDir(taskid), "log.index.cleanup");
-    } else {
-      return new File(getAttemptDir(taskid), "log.index");
-    }
+
+  static File getIndexFile(TaskAttemptID taskid, boolean isCleanup) {
+    return new File(getAttemptDir(taskid, isCleanup), "log.index");
   }
 
   static String getBaseLogDir() {
     return System.getProperty("hadoop.log.dir");
   }
 
-  static File getAttemptDir(String taskid) {
-    return new File(getJobDir(TaskAttemptID.forName(taskid).getJobID()),
-        taskid);
+  static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
+    String cleanupSuffix = isCleanup ? ".cleanup" : "";
+    return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
   }
   private static long prevOutLength;
   private static long prevErrLength;
   private static long prevLogLength;
   
-  private static void writeToIndexFile(TaskAttemptID firstTaskid,
+  private static void writeToIndexFile(String logLocation,
                                        boolean isCleanup) 
   throws IOException {
     // To ensure atomicity of updates to index file, write to temporary index
     // file first and then rename.
-    File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
+    File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
     
     BufferedOutputStream bos = 
       new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
@@ -183,20 +173,23 @@ public class TaskLog {
     //STDOUT: <start-offset in the stdout file> <length>
     //STDERR: <start-offset in the stderr file> <length>
     //SYSLOG: <start-offset in the syslog file> <length>    
-    dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+
-        LogName.STDOUT.toString()+":");
-    dos.writeBytes(Long.toString(prevOutLength)+" ");
-    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
-        .length() - prevOutLength)+"\n"+LogName.STDERR+":");
-    dos.writeBytes(Long.toString(prevErrLength)+" ");
-    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
-        .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":");
-    dos.writeBytes(Long.toString(prevLogLength)+" ");
-    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
-        .length() - prevLogLength)+"\n");
+    dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
+        + LogName.STDOUT.toString() + ":");
+    dos.writeBytes(Long.toString(prevOutLength) + " ");
+    dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT
+        .toString()).length() - prevOutLength)
+        + "\n" + LogName.STDERR + ":");
+    dos.writeBytes(Long.toString(prevErrLength) + " ");
+    dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR
+        .toString()).length() - prevErrLength)
+        + "\n" + LogName.SYSLOG.toString() + ":");
+    dos.writeBytes(Long.toString(prevLogLength) + " ");
+    dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG
+        .toString()).length() - prevLogLength)
+        + "\n");
     dos.close();
 
-    File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
+    File indexFile = getIndexFile(currentTaskid, isCleanup);
     Path indexFilePath = new Path(indexFile.getAbsolutePath());
     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
 
@@ -205,21 +198,15 @@ public class TaskLog {
     }
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
-  private static void resetPrevLengths(TaskAttemptID firstTaskid) {
-    prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
-    prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
-    prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
+  private static void resetPrevLengths(String logLocation) {
+    prevOutLength = new File(logLocation, LogName.STDOUT.toString()).length();
+    prevErrLength = new File(logLocation, LogName.STDERR.toString()).length();
+    prevLogLength = new File(logLocation, LogName.SYSLOG.toString()).length();
   }
   private volatile static TaskAttemptID currentTaskid = null;
 
-  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
-                                           TaskAttemptID taskid) 
-  throws IOException {
-    syncLogs(firstTaskid, taskid, false);
-  }
-  
   @SuppressWarnings("unchecked")
-  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+  public synchronized static void syncLogs(String logLocation, 
                                            TaskAttemptID taskid,
                                            boolean isCleanup) 
   throws IOException {
@@ -238,9 +225,9 @@ public class TaskLog {
     }
     if (currentTaskid != taskid) {
       currentTaskid = taskid;
-      resetPrevLengths(firstTaskid);
+      resetPrevLengths(logLocation);
     }
-    writeToIndexFile(firstTaskid, isCleanup);
+    writeToIndexFile(logLocation, isCleanup);
   }
   
   /**
@@ -274,15 +261,10 @@ public class TaskLog {
     }
   }
 
-  static class Reader extends InputStream {
+  public static class Reader extends InputStream {
     private long bytesRemaining;
     private FileInputStream file;
 
-    public Reader(TaskAttemptID taskid, LogName kind, 
-                  long start, long end) throws IOException {
-      this(taskid, kind, start, end, false);
-    }
-    
     /**
      * Read a log file from start to end positions. The offsets may be negative,
      * in which case they are relative to the end of the file. For example,
@@ -312,8 +294,7 @@ public class TaskLog {
       start += fileDetail.start;
       end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(new File(getAttemptDir(fileDetail.location), 
-          kind.toString()));
+      file = new FileInputStream(new File(fileDetail.location, kind.toString()));
       // skip upto start
       long pos = 0;
       while (pos < start) {

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Tue May 11 14:32:50 2010
@@ -34,6 +34,7 @@ public class TaskLogAppender extends Fil
   //so that log4j can configure it from the configuration(log4j.properties). 
   private int maxEvents;
   private Queue<LoggingEvent> tail = null;
+  private boolean isCleanup;
 
   @Override
   public void activateOptions() {
@@ -41,8 +42,8 @@ public class TaskLogAppender extends Fil
       if (maxEvents > 0) {
         tail = new LinkedList<LoggingEvent>();
       }
-      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId), 
-                                     TaskLog.LogName.SYSLOG).toString());
+      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+          isCleanup, TaskLog.LogName.SYSLOG).toString());
       setAppend(true);
       super.activateOptions();
     }
@@ -98,4 +99,22 @@ public class TaskLogAppender extends Fil
     maxEvents = (int) logSize / EVENT_SIZE;
   }
 
+  /**
+   * Set whether the task is a cleanup attempt or not.
+   * 
+   * @param isCleanup
+   *          true if the task is cleanup attempt, false otherwise.
+   */
+  public void setIsCleanup(boolean isCleanup) {
+    this.isCleanup = isCleanup;
+  }
+
+  /**
+   * Get whether task is cleanup attempt or not.
+   * 
+   * @return true if the task is cleanup attempt, false otherwise.
+   */
+  public boolean getIsCleanup() {
+    return isCleanup;
+  }
 }

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Tue May 11 14:32:50 2010
@@ -43,8 +43,9 @@ import org.apache.hadoop.util.StringUtil
 public class TaskLogServlet extends HttpServlet {
   private static final long serialVersionUID = -6615764817774487321L;
   
-  private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
-    File f = TaskLog.getTaskLogFile(taskId, type);
+  private boolean haveTaskLog(TaskAttemptID taskId, boolean isCleanup,
+      TaskLog.LogName type) {
+    File f = TaskLog.getTaskLogFile(taskId, isCleanup, type);
     return f.canRead();
   }
 
@@ -145,9 +146,10 @@ public class TaskLogServlet extends Http
    * viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
    * cluster).
    */
-  static Configuration getConfFromJobACLsFile(String attemptIdStr) {
+  static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
+      boolean isCleanup) {
     Path jobAclsFilePath = new Path(
-        TaskLog.getAttemptDir(attemptIdStr).toString(), TaskRunner.jobACLsFile);
+        TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
     Configuration conf = null;
     if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
       conf = new Configuration(false);
@@ -176,38 +178,6 @@ public class TaskLogServlet extends Http
       return;
     }
 
-    TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
-    if (!TaskLog.getAttemptDir(attemptIdStr).exists()) {
-      response.sendError(HttpServletResponse.SC_GONE,
-          "Task log directory for task " + attemptId +
-          " does not exist. May be cleaned up by Task Tracker, if older logs.");
-      return;
-    }
-
-    // get user name who is accessing
-    String user = request.getRemoteUser();
-    if (user != null) {
-      ServletContext context = getServletContext();
-      TaskTracker taskTracker = (TaskTracker) context.getAttribute(
-          "task.tracker");
-      // get jobACLConf from ACLs file
-      Configuration jobACLConf = getConfFromJobACLsFile(attemptIdStr);
-      // Ignore authorization if job-acls.xml is not found
-      if (jobACLConf != null) {
-        JobID jobId = attemptId.getJobID();
-
-        try {
-          checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
-              taskTracker);
-        } catch (AccessControlException e) {
-          String errMsg = "User " + user + " failed to view tasklogs of job " +
-              jobId + "!\n\n" + e.getMessage();
-          response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
-          return;
-        }
-      }
-    }
-
     String logFilter = request.getParameter("filter");
     if (logFilter != null) {
       try {
@@ -240,6 +210,38 @@ public class TaskLogServlet extends Http
       isCleanup = Boolean.valueOf(sCleanup);
     }
     
+    TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
+    if (!TaskLog.getAttemptDir(attemptId, isCleanup).exists()) {
+      response.sendError(HttpServletResponse.SC_GONE,
+          "Task log directory for task " + attemptId +
+          " does not exist. May be cleaned up by Task Tracker, if older logs.");
+      return;
+    }
+
+    // get user name who is accessing
+    String user = request.getRemoteUser();
+    if (user != null) {
+      ServletContext context = getServletContext();
+      TaskTracker taskTracker = (TaskTracker) context.getAttribute(
+          "task.tracker");
+      // get jobACLConf from ACLs file
+      Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+      // Ignore authorization if job-acls.xml is not found
+      if (jobACLConf != null) {
+        JobID jobId = attemptId.getJobID();
+
+        try {
+          checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
+              taskTracker);
+        } catch (AccessControlException e) {
+          String errMsg = "User " + user + " failed to view tasklogs of job " +
+              jobId + "!\n\n" + e.getMessage();
+          response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
+          return;
+        }
+      }
+    }
+
     OutputStream out = response.getOutputStream();
     if( !plainText ) {
       out.write(("<html>\n" +
@@ -254,11 +256,11 @@ public class TaskLogServlet extends Http
                      TaskLog.LogName.STDERR, isCleanup);
         printTaskLog(response, out, attemptId, start, end, plainText,
                      TaskLog.LogName.SYSLOG, isCleanup);
-        if (haveTaskLog(attemptId, TaskLog.LogName.DEBUGOUT)) {
+        if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.DEBUGOUT)) {
           printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.DEBUGOUT, isCleanup);
         }
-        if (haveTaskLog(attemptId, TaskLog.LogName.PROFILE)) {
+        if (haveTaskLog(attemptId, isCleanup, TaskLog.LogName.PROFILE)) {
           printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.PROFILE, isCleanup);
         }

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue May 11 14:32:50 2010
@@ -216,7 +216,7 @@ abstract class TaskRunner extends Thread
       List<String> setup = getVMSetupCmd();
 
       // Set up the redirection of the task's stdout and stderr streams
-      File[] logFiles = prepareLogFiles(taskid);
+      File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
       File stdout = logFiles[0];
       File stderr = logFiles[1];
       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
@@ -286,13 +286,17 @@ abstract class TaskRunner extends Thread
    * Prepare the log files for the task
    * 
    * @param taskid
+   * @param isCleanup
    * @return an array of files. The first file is stdout, the second is stderr.
    * @throws IOException 
    */
-  File[] prepareLogFiles(TaskAttemptID taskid) throws IOException {
+  File[] prepareLogFiles(TaskAttemptID taskid, boolean isCleanup)
+      throws IOException {
     File[] logFiles = new File[2];
-    logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
-    logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    logFiles[0] = TaskLog.getTaskLogFile(taskid, isCleanup,
+        TaskLog.LogName.STDOUT);
+    logFiles[1] = TaskLog.getTaskLogFile(taskid, isCleanup,
+        TaskLog.LogName.STDERR);
     File logDir = logFiles[0].getParentFile();
     boolean b = logDir.mkdirs();
     if (!b) {
@@ -456,17 +460,13 @@ abstract class TaskRunner extends Thread
     vargs.add(classPath);
 
     // Setup the log4j prop
-    vargs.add("-Dhadoop.log.dir=" + 
-        new File(System.getProperty("hadoop.log.dir")
-        ).getAbsolutePath());
-    vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA");
-    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
-    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+    setupLog4jProperties(vargs, taskid, logSize);
 
     if (conf.getProfileEnabled()) {
       if (conf.getProfileTaskRange(t.isMapTask()
                                    ).isIncluded(t.getPartition())) {
-        File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+        File prof = TaskLog.getTaskLogFile(taskid, t.isTaskCleanupTask(),
+            TaskLog.LogName.PROFILE);
         vargs.add(String.format(conf.getProfileParams(), prof.toString()));
       }
     }
@@ -478,9 +478,21 @@ abstract class TaskRunner extends Thread
     vargs.add(address.getAddress().getHostAddress()); 
     vargs.add(Integer.toString(address.getPort())); 
     vargs.add(taskid.toString());                      // pass task identifier
+    // pass task log location
+    vargs.add(TaskLog.getAttemptDir(taskid, t.isTaskCleanupTask()).toString());
     return vargs;
   }
 
+  private void setupLog4jProperties(Vector<String> vargs, TaskAttemptID taskid,
+      long logSize) {
+    vargs.add("-Dhadoop.log.dir=" + 
+        new File(System.getProperty("hadoop.log.dir")).getAbsolutePath());
+    vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA");
+    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+    vargs.add("-Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask());
+    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+  }
+
   /**
    * @param taskid
    * @param workDir
@@ -563,6 +575,7 @@ abstract class TaskRunner extends Thread
       hadoopClientOpts = hadoopClientOpts + " ";
     }
     hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+                       + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask()
                        + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
     env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
 

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue May 11 14:32:50 2010
@@ -2744,17 +2744,17 @@ public class TaskTracker 
       String jobConf = task.getJobFile();
       try {
         // get task's stdout file 
-        taskStdout = FileUtil.makeShellPath(
-            TaskLog.getRealTaskLogFileLocation
-                          (task.getTaskID(), TaskLog.LogName.STDOUT));
-        // get task's stderr file 
-        taskStderr = FileUtil.makeShellPath(
-            TaskLog.getRealTaskLogFileLocation
-                          (task.getTaskID(), TaskLog.LogName.STDERR));
-        // get task's syslog file 
-        taskSyslog = FileUtil.makeShellPath(
-            TaskLog.getRealTaskLogFileLocation
-                          (task.getTaskID(), TaskLog.LogName.SYSLOG));
+        taskStdout = FileUtil
+            .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(),
+                task.isTaskCleanupTask(), TaskLog.LogName.STDOUT));
+        // get task's stderr file
+        taskStderr = FileUtil
+            .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(),
+                task.isTaskCleanupTask(), TaskLog.LogName.STDERR));
+        // get task's syslog file
+        taskSyslog = FileUtil
+            .makeShellPath(TaskLog.getRealTaskLogFileLocation(task.getTaskID(),
+                task.isTaskCleanupTask(), TaskLog.LogName.SYSLOG));
       } catch(IOException e){
         LOG.warn("Exception finding task's stdout/err/syslog files");
       }
@@ -2773,8 +2773,8 @@ public class TaskTracker 
                   StringUtils.stringifyException(e));
       }
       // Build the command  
-      File stdout = TaskLog.getRealTaskLogFileLocation(
-                           task.getTaskID(), TaskLog.LogName.DEBUGOUT);
+      File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task
+          .isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT);
       // add pipes program as argument if it exists.
       String program ="";
       String executable = Submitter.getExecutable(localJobConf);

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/pipes/Application.java Tue May 11 14:32:50 2010
@@ -97,10 +97,12 @@ class Application<K1 extends WritableCom
     }
     cmd.add(executable);
     // wrap the command in a stdout/stderr capture
+    // we are starting map/reduce task of the pipes job. this is not a cleanup
+    // attempt. 
     TaskAttemptID taskid = 
       TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
-    File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
-    File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
+    File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
     cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
                                      false);

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java Tue May 11 14:32:50 2010
@@ -146,7 +146,7 @@ public class TestDebugScript {
    */
   static void verifyDebugScriptOutput(TaskAttemptID taskId, String expectedUser, 
       String expectedGroup, String expectedPerms) throws Exception {
-    File output = TaskLog.getRealTaskLogFileLocation(taskId, 
+    File output = TaskLog.getRealTaskLogFileLocation(taskId, false,
         TaskLog.LogName.DEBUGOUT);
     // Check the presence of the output file if the script is to be run.
     assertTrue("Output file does not exists. DebugScript has not been run",

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Tue May 11 14:32:50 2010
@@ -19,7 +19,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.net.HttpURLConnection;
 
 import junit.framework.TestCase;
 
@@ -32,11 +32,12 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskType;
 
 public class TestTaskFail extends TestCase {
   private static String taskLog = "Task attempt log";
-  private static String cleanupLog = "cleanup attempt log";
+  static String cleanupLog = "cleanup attempt log";
 
   public static class MapperClass extends MapReduceBase
   implements Mapper<LongWritable, Text, Text, IntWritable> {
@@ -48,6 +49,8 @@ public class TestTaskFail extends TestCa
                      OutputCollector<Text, IntWritable> output, 
                      Reporter reporter) throws IOException {
       System.err.println(taskLog);
+      assertFalse(Boolean.getBoolean(System
+          .getProperty("hadoop.tasklog.iscleanup")));
       if (taskid.endsWith("_0")) {
         throw new IOException();
       } else if (taskid.endsWith("_1")) {
@@ -61,6 +64,15 @@ public class TestTaskFail extends TestCa
   static class CommitterWithLogs extends FileOutputCommitter {
     public void abortTask(TaskAttemptContext context) throws IOException {
       System.err.println(cleanupLog);
+      String attemptId = System.getProperty("hadoop.tasklog.taskid");
+      assertNotNull(attemptId);
+      if (attemptId.endsWith("_0")) {
+        assertFalse(Boolean.getBoolean(System
+            .getProperty("hadoop.tasklog.iscleanup")));
+      } else {
+        assertTrue(Boolean.getBoolean(System
+            .getProperty("hadoop.tasklog.iscleanup")));
+      }
       super.abortTask(context);
     }
   }
@@ -113,65 +125,44 @@ public class TestTaskFail extends TestCa
   }
   
   private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId, 
-                               TaskStatus ts, boolean isCleanup) 
+                               TaskStatus ts, boolean isCleanup, JobTracker jt) 
   throws IOException {
     assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
     assertTrue(ts != null);
     assertEquals(TaskStatus.State.FAILED, ts.getRunState());
     // validate tasklogs for task attempt
-    String log = readTaskLog(
+    String log = MapReduceTestUtil.readTaskLog(
                       TaskLog.LogName.STDERR, attemptId, false);
     assertTrue(log.contains(taskLog));
+    // access the logs from web url
+    TaskTrackerStatus ttStatus = jt.getTaskTracker(
+        tip.machineWhereTaskRan(attemptId)).getStatus();
+    String tasklogUrl = TaskLogServlet.getTaskLogUrl("localhost",
+        String.valueOf(ttStatus.getHttpPort()), attemptId.toString()) +
+        "&filter=STDERR";
+    assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
+        .getHttpStatusCode(tasklogUrl, tip.getUser(), "GET"));
     if (!isCleanup) {
       // validate task logs: tasklog should contain both task logs
       // and cleanup logs
       assertTrue(log.contains(cleanupLog));
     } else {
       // validate tasklogs for cleanup attempt
-      log = readTaskLog(
+      log = MapReduceTestUtil.readTaskLog(
                  TaskLog.LogName.STDERR, attemptId, true);
       assertTrue(log.contains(cleanupLog));
+      // access the cleanup attempt's logs from web url
+      ttStatus = jt.getTaskTracker(tip.machineWhereCleanupRan(attemptId))
+          .getStatus();
+      String cleanupTasklogUrl = TaskLogServlet.getTaskLogUrl("localhost",
+          String.valueOf(ttStatus.getHttpPort()), attemptId.toString())
+          + "&filter=STDERR&cleanup=true";
+      assertEquals(HttpURLConnection.HTTP_OK, TestWebUIAuthorization
+          .getHttpStatusCode(cleanupTasklogUrl, tip.getUser(), "GET"));
     }
   }
 
-  /**
-   * Reads tasklog and returns it as string after trimming it.
-   * @param filter Task log filter; can be STDOUT, STDERR,
-   *                SYSLOG, DEBUGOUT, DEBUGERR
-   * @param taskId The task id for which the log has to collected
-   * @param isCleanup whether the task is a cleanup attempt or not.
-   * @return task log as string
-   * @throws IOException
-   */
-  private String readTaskLog(TaskLog.LogName  filter, 
-                                   TaskAttemptID taskId, 
-                                   boolean isCleanup)
-  throws IOException {
-    // string buffer to store task log
-    StringBuffer result = new StringBuffer();
-    int res;
-
-    // reads the whole tasklog into inputstream
-    InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
-    // construct string log from inputstream.
-    byte[] b = new byte[65536];
-    while (true) {
-      res = taskLogReader.read(b);
-      if (res > 0) {
-        result.append(new String(b));
-      } else {
-        break;
-      }
-    }
-    taskLogReader.close();
-    
-    // trim the string and return it
-    String str = result.toString();
-    str = str.trim();
-    return str;
-  }
-  
-  private void validateJob(RunningJob job, MiniMRCluster mr) 
+  private void validateJob(RunningJob job, JobTracker jt) 
   throws IOException {
     assertEquals(JobStatus.SUCCEEDED, job.getJobState());
 	    
@@ -181,23 +172,21 @@ public class TestTaskFail extends TestCa
     // fails with an exception
     TaskAttemptID attemptId = 
       new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0);
-    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
-                            getTip(attemptId.getTaskID());
-    TaskStatus ts = 
-      mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, false);
+    TaskInProgress tip = jt.getTip(attemptId.getTaskID());
+    TaskStatus ts = jt.getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, false, jt);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1);
     // this should be cleanup attempt since the second attempt fails
     // with System.exit
-    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, true);
+    ts = jt.getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, true, jt);
     
     attemptId =  new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2);
     // this should be cleanup attempt since the third attempt fails
     // with Error
-    ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
-    validateAttempt(tip, attemptId, ts, true);
+    ts = jt.getTaskStatus(attemptId);
+    validateAttempt(tip, attemptId, ts, true, jt);
   }
   
   public void testWithDFS() throws IOException {
@@ -211,6 +200,7 @@ public class TestTaskFail extends TestCa
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
       final Path inDir = new Path("./input");
       final Path outDir = new Path("./output");
       String input = "The quick brown fox\nhas many silly\nred fox sox\n";
@@ -222,18 +212,18 @@ public class TestTaskFail extends TestCa
       jobConf.setOutputCommitter(CommitterWithLogs.class);
       RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, mr);
+      validateJob(rJob, jt);
       // launch job with fail tasks and fail-cleanups
       fileSys.delete(outDir, true);
       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
       rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, mr);
+      validateJob(rJob, jt);
       fileSys.delete(outDir, true);
       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
       rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();
-      validateJob(rJob, mr);
+      validateJob(rJob, jt);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue May 11 14:32:50 2010
@@ -576,7 +576,8 @@ public class TestTaskTrackerLocalization
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
         localizedJobConf);
-    attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
+    attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
+        task.isTaskCleanupTask());
 
     // Make sure the task-conf file is created
     Path localTaskFile =
@@ -626,7 +627,7 @@ public class TestTaskTrackerLocalization
         .getPath(), "tmp").exists());
 
     // Make sure that the logs are setup properly
-    File logDir = TaskLog.getAttemptDir(taskId.toString());
+    File logDir = TaskLog.getAttemptDir(taskId, task.isTaskCleanupTask());
     assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
         logDir.exists());
     checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
@@ -655,8 +656,8 @@ public class TestTaskTrackerLocalization
         taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
 
     // Validate the contents of jobACLsFile(both user name and job-view-acls)
-    Configuration jobACLsConf =
-        TaskLogServlet.getConfFromJobACLsFile(task.getTaskID().toString());
+    Configuration jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(task
+        .getTaskID(), task.isTaskCleanupTask());
     assertTrue(jobACLsConf.get(MRJobConfig.USER_NAME).equals(
         localizedJobConf.getUser()));
     assertTrue(jobACLsConf.get(MRJobConfig.JOB_ACL_VIEW_JOB).
@@ -695,7 +696,7 @@ public class TestTaskTrackerLocalization
    * $taskid/work
    * Also see createFileAndSetPermissions for details
    */
-  void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
+  void validateRemoveTaskFiles(boolean needCleanup, boolean jvmReuse,
                            TaskInProgress tip) throws IOException {
     // create files and set permissions 555. Verify if task controller sets
     // the permissions for TT to delete the taskDir or workDir
@@ -733,7 +734,6 @@ public class TestTaskTrackerLocalization
       // now try to delete the work dir and verify that there are no stale paths
       JvmManager.deleteWorkDir(tracker, task);
     }
-    tracker.removeJobFiles(task.getUser(), jobId);
 
     assertTrue("Some task files are not deleted!! Number of stale paths is "
         + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
@@ -743,42 +743,51 @@ public class TestTaskTrackerLocalization
    * Validates if task cleanup is done properly for a succeeded task
    * @throws IOException
    */
-  public void testTaskCleanup()
+  public void testTaskFilesRemoval()
       throws Exception {
     if (!canRun()) {
       return;
     }
-    testTaskCleanup(false, false);// no needCleanup; no jvmReuse
+    testTaskFilesRemoval(false, false);// no needCleanup; no jvmReuse
   }
 
   /**
    * Validates if task cleanup is done properly for a task that is not succeeded
    * @throws IOException
    */
-  public void testFailedTaskCleanup()
+  public void testFailedTaskFilesRemoval()
   throws Exception {
     if (!canRun()) {
       return;
     }
-    testTaskCleanup(true, false);// needCleanup; no jvmReuse
+    testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
+
+    // initialize a cleanupAttempt for the task.
+    task.setTaskCleanupTask();
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+
+    // verify the cleanup of cleanup attempt.
+    testTaskFilesRemoval(true, false);// needCleanup; no jvmReuse
   }
 
   /**
    * Validates if task cleanup is done properly for a succeeded task
    * @throws IOException
    */
-  public void testTaskCleanupWithJvmUse()
+  public void testTaskFilesRemovalWithJvmUse()
       throws Exception {
     if (!canRun()) {
       return;
     }
-    testTaskCleanup(false, true);// no needCleanup; jvmReuse
+    testTaskFilesRemoval(false, true);// no needCleanup; jvmReuse
   }
 
   /**
    * Validates if task cleanup is done properly
    */
-  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+  private void testTaskFilesRemoval(boolean needCleanup, boolean jvmReuse)
       throws Exception {
     // Localize job and localize task.
     TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
@@ -792,19 +801,7 @@ public class TestTaskTrackerLocalization
 
     // create files and set permissions 555. Verify if task controller sets
     // the permissions for TT to delete the task dir or work dir properly
-    validateRemoveFiles(needCleanup, jvmReuse, tip);
-
-    // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still
-    // there.
-    for (String localDir : localDirs) {
-      Path userDir =
-          new Path(localDir, TaskTracker.getUserDir(task.getUser()));
-      assertTrue("User directory " + userDir + " is not present!!",
-          tracker.getLocalFileSystem().exists(userDir));
-    }
-
-    // Test userlogs cleanup.
-    verifyUserLogsCleanup();
+    validateRemoveTaskFiles(needCleanup, jvmReuse, tip);
   }
 
   /**
@@ -812,7 +809,7 @@ public class TestTaskTrackerLocalization
    * 
    * @throws IOException
    */
-  private void verifyUserLogsCleanup()
+  private void verifyUserLogsRemoval()
       throws IOException {
     // verify user logs cleanup
     File jobUserLogDir = TaskLog.getJobDir(jobId);
@@ -832,7 +829,7 @@ public class TestTaskTrackerLocalization
    *   - create files with no write permissions to TT under job-work-dir
    *   - create files with no write permissions to TT under task-work-dir
    */
-  public void testJobCleanup() throws IOException, InterruptedException {
+  public void testJobFilesRemoval() throws IOException, InterruptedException {
     if (!canRun()) {
       return;
     }
@@ -899,6 +896,17 @@ public class TestTaskTrackerLocalization
     }
     assertFalse("Job " + task.getJobID() + " work dir exists after cleanup", 
                 jWorkDirExists);
+    // Test userlogs cleanup.
+    verifyUserLogsRemoval();
+
+    // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
+    // there.
+    for (String localDir : localDirs) {
+      Path userDir =
+          new Path(localDir, TaskTracker.getUserDir(task.getUser()));
+      assertTrue("User directory " + userDir + " is not present!!",
+          tracker.getLocalFileSystem().exists(userDir));
+    }
   }
   
   /**
@@ -983,4 +991,30 @@ public class TestTaskTrackerLocalization
     checkTaskLocalization();
   }
 
+  /**
+   * Localizes a cleanup task and validates permissions.
+   * 
+   * @throws InterruptedException 
+   * @throws IOException 
+   */
+  public void testCleanupTaskLocalization() throws IOException,
+      InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+
+    task.setTaskCleanupTask();
+    // register task
+    tip = tracker.new TaskInProgress(task, trackerFConf);
+
+    // localize the job.
+    RunningJob rjob = tracker.localizeJob(tip);
+    localizedJobConf = rjob.getJobConf();
+    checkJobLocalization();
+
+    // localize task cleanup attempt
+    initializeTask();
+    checkTaskLocalization();
+
+  }
 }

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java Tue May 11 14:32:50 2010
@@ -341,8 +341,9 @@ public class TestWebUIAuthorization exte
 
         // delete job-acls.xml file from the task log dir of attempt and verify
         // if unauthorized users can view task logs of attempt.
-        Path jobACLsFilePath = new Path(
-            TaskLog.getAttemptDir(attempt.toString()).toString(),
+        File attemptLogDir = TaskLog.getAttemptDir(
+            org.apache.hadoop.mapred.TaskAttemptID.downgrade(attempt), false);
+        Path jobACLsFilePath = new Path(attemptLogDir.toString(),
             TaskRunner.jobACLsFile);
         new File(jobACLsFilePath.toUri().getPath()).delete();
         assertEquals("Incorrect return code for " + unauthorizedUser,
@@ -354,7 +355,7 @@ public class TestWebUIAuthorization exte
 
         // delete the whole task log dir of attempt and verify that we get
         // correct response code (i.e. HTTP_GONE) when task logs are accessed.
-        FileUtil.fullyDelete(TaskLog.getAttemptDir(attempt.toString()));
+        FileUtil.fullyDelete(attemptLogDir);
         assertEquals("Incorrect return code for " + jobSubmitter,
             HttpURLConnection.HTTP_GONE, getHttpStatusCode(stdoutURL,
             jobSubmitter, "GET"));

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=943124&r1=943123&r2=943124&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Tue May 11 14:32:50 2010
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.text.NumberFormat;
 import java.util.ArrayList;
@@ -43,7 +44,10 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskLog.Reader;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -415,4 +419,44 @@ public class MapReduceTestUtil {
     return result.toString();
   }
 
+  /**
+   * Reads tasklog and returns it as string after trimming it.
+   * 
+   * @param filter
+   *          Task log filter; can be STDOUT, STDERR, SYSLOG, DEBUGOUT, PROFILE
+   * @param taskId
+   *          The task id for which the log has to collected
+   * @param isCleanup
+   *          whether the task is a cleanup attempt or not.
+   * @return task log as string
+   * @throws IOException
+   */
+  public static String readTaskLog(TaskLog.LogName filter,
+      org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
+      throws IOException {
+    // string buffer to store task log
+    StringBuffer result = new StringBuffer();
+    int res;
+
+    // reads the whole tasklog into inputstream
+    InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
+        isCleanup);
+    // construct string log from inputstream.
+    byte[] b = new byte[65536];
+    while (true) {
+      res = taskLogReader.read(b);
+      if (res > 0) {
+        result.append(new String(b));
+      } else {
+        break;
+      }
+    }
+    taskLogReader.close();
+
+    // trim the string and return it
+    String str = result.toString();
+    str = str.trim();
+    return str;
+  }
+
 }