You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:37 UTC

svn commit: r1077082 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 03:38:37 2011
New Revision: 1077082

URL: http://svn.apache.org/viewvc?rev=1077082&view=rev
Log:
commit adfa7a6d969469517d5f3a91127e6ee9d65c921d
Author: Arun C Murthy <ac...@apache.org>
Date:   Wed Dec 16 23:46:27 2009 +0530

    MAPREDUCE-1100. Truncate user logs to prevent TaskTrackers' disks from filling up. Contributed by Vinod Kumar Vavilapalli.
    
    From: https://issues.apache.org/jira/secure/attachment/12428200/MAPREDUCE-1100-20091216.2.txt
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1100. Truncate user logs to prevent TaskTrackers' disks from
    +    filling up. (Vinod Kumar Vavilapalli via acmurthy)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077082&r1=1077081&r2=1077082&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 03:38:37 2011
@@ -195,6 +195,8 @@ class JvmManager {
             equals(task.getTaskID().toString())) {
           tracker.getTaskController().initializeTask(context);
         }
+
+        jvmRunner.taskGiven(task);
         return taskRunner.getTaskInProgress();
 
       }
@@ -376,6 +378,13 @@ class JvmManager {
       private ShellCommandExecutor shexec; // shell terminal for running the task
       //context used for starting JVM
       private TaskControllerContext initalContext;
+
+      private List<Task> tasksGiven = new ArrayList<Task>();
+
+      void taskGiven(Task task) {
+        tasksGiven.add(task);
+      }
+
       public JvmRunner(JvmEnv env, JobID jobId) {
         this.env = env;
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
@@ -384,6 +393,9 @@ class JvmManager {
       }
       public void run() {
         runChild(env);
+
+        // Post-JVM-exit logs processing. Truncate the logs.
+        truncateJVMLogs();
       }
 
       public void runChild(JvmEnv env) {
@@ -446,7 +458,14 @@ class JvmManager {
           removeJvm(jvmId);
         }
       }
-      
+
+      // Post-JVM-exit logs processing. Truncate the logs.
+      private void truncateJVMLogs() {
+        Task firstTask = initalContext.task;
+        tracker.getTaskLogsMonitor().addProcessForLogTruncation(
+            firstTask.getTaskID(), tasksGiven);
+      }
+
       public void taskRan() {
         busy = false;
         numTasksRan++;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077082&r1=1077081&r2=1077082&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar  4 03:38:37 2011
@@ -28,8 +28,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,33 +72,58 @@ public class TaskLog {
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
     return new File(getBaseDir(taskid.toString()), filter.toString());
   }
+
+  /**
+   * @deprecated Instead use
+   *             {@link #getAllLogsFileDetails(TaskAttemptID, boolean)} to get
+   *             the details of all log-files and then use the particular
+   *             log-type's detail to call getRealTaskLogFileLocation(String,
+   *             LogName) real log-location
+   */
+  @Deprecated
   public static File getRealTaskLogFileLocation(TaskAttemptID taskid, 
       LogName filter) {
     LogFileDetail l;
     try {
-      l = getTaskLogFileDetail(taskid, filter);
+      Map<LogName, LogFileDetail> allFilesDetails =
+          getAllLogsFileDetails(taskid, false);
+      l = allFilesDetails.get(filter);
     } catch (IOException ie) {
-      LOG.error("getTaskLogFileDetail threw an exception " + ie);
+      LOG.error("getTaskLogFileDetailgetAllLogsFileDetails threw an exception "
+          + ie);
       return null;
     }
     return new File(getBaseDir(l.location), filter.toString());
   }
-  private static class LogFileDetail {
+
+  /**
+   * Get the real task-log file-path
+   * 
+   * @param location Location of the log-file. This should point to an
+   *          attempt-directory.
+   * @param filter
+   * @return
+   * @throws IOException
+   */
+  static String getRealTaskLogFilePath(String location, LogName filter)
+      throws IOException {
+    return FileUtil.makeShellPath(new File(getBaseDir(location),
+        filter.toString()));
+  }
+
+  static class LogFileDetail {
     final static String LOCATION = "LOG_DIR:";
     String location;
     long start;
     long length;
   }
-  
-  private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
-      LogName filter) throws IOException {
-    return getLogFileDetail(taskid, filter, false);
-  }
-  
-  private static LogFileDetail getLogFileDetail(TaskAttemptID taskid, 
-                                                LogName filter,
-                                                boolean isCleanup) 
-  throws IOException {
+
+  static Map<LogName, LogFileDetail> getAllLogsFileDetails(
+      TaskAttemptID taskid, boolean isCleanup) throws IOException {
+
+    Map<LogName, LogFileDetail> allLogsFileDetails =
+        new HashMap<LogName, LogFileDetail>();
+
     File indexFile = getIndexFile(taskid.toString(), isCleanup);
     BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
     //the format of the index file is
@@ -103,36 +131,37 @@ public class TaskLog {
     //stdout:<start-offset in the stdout file> <length>
     //stderr:<start-offset in the stderr file> <length>
     //syslog:<start-offset in the syslog file> <length>
-    LogFileDetail l = new LogFileDetail();
     String str = fis.readLine();
     if (str == null) { //the file doesn't have anything
       throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
     }
-    l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
+    String loc = str.substring(str.indexOf(LogFileDetail.LOCATION)+
         LogFileDetail.LOCATION.length());
     //special cases are the debugout and profile.out files. They are guaranteed
     //to be associated with each task attempt since jvm reuse is disabled
     //when profiling/debugging is enabled
-    if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
+    for (LogName filter : new LogName[] { LogName.DEBUGOUT, LogName.PROFILE }) {
+      LogFileDetail l = new LogFileDetail();
+      l.location = loc;
       l.length = new File(getBaseDir(l.location), filter.toString()).length();
       l.start = 0;
-      fis.close();
-      return l;
+      allLogsFileDetails.put(filter, l);
     }
     str = fis.readLine();
     while (str != null) {
-      //look for the exact line containing the logname
-      if (str.contains(filter.toString())) {
-        str = str.substring(filter.toString().length()+1);
-        String[] startAndLen = str.split(" ");
-        l.start = Long.parseLong(startAndLen[0]);
-        l.length = Long.parseLong(startAndLen[1]);
-        break;
-      }
+      LogFileDetail l = new LogFileDetail();
+      l.location = loc;
+      int idx = str.indexOf(':');
+      LogName filter = LogName.valueOf(str.substring(0, idx).toUpperCase());
+      str = str.substring(idx + 1);
+      String[] startAndLen = str.split(" ");
+      l.start = Long.parseLong(startAndLen[0]);
+      l.length = Long.parseLong(startAndLen[1]);
+      allLogsFileDetails.put(filter, l);
       str = fis.readLine();
     }
     fis.close();
-    return l;
+    return allLogsFileDetails;
   }
   
   private static File getTmpIndexFile(String taskid) {
@@ -150,16 +179,30 @@ public class TaskLog {
     }
   }
   
-  private static File getBaseDir(String taskid) {
+  static File getBaseDir(String taskid) {
     return new File(LOG_DIR, taskid);
   }
-  private static long prevOutLength;
-  private static long prevErrLength;
-  private static long prevLogLength;
+
+  static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
+      Arrays.asList(LogName.STDOUT, LogName.STDERR, LogName.SYSLOG);
+
+  private static TaskAttemptID currentTaskid;
+
+  /**
+   * Map to store previous and current lengths.
+   */
+  private static Map<LogName, Long[]> logLengths =
+      new HashMap<LogName, Long[]>();
+  static {
+    for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
+      logLengths.put(logName, new Long[] { Long.valueOf(0L),
+          Long.valueOf(0L) });
+    }
+  }
   
-  private static void writeToIndexFile(TaskAttemptID firstTaskid,
-                                       boolean isCleanup) 
-  throws IOException {
+  static void writeToIndexFile(TaskAttemptID firstTaskid,
+      TaskAttemptID currentTaskid, boolean isCleanup,
+      Map<LogName, Long[]> lengths) throws IOException {
     // To ensure atomicity of updates to index file, write to temporary index
     // file first and then rename.
     File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
@@ -172,17 +215,15 @@ public class TaskLog {
     //STDOUT: <start-offset in the stdout file> <length>
     //STDERR: <start-offset in the stderr file> <length>
     //SYSLOG: <start-offset in the syslog file> <length>    
-    dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+
-        LogName.STDOUT.toString()+":");
-    dos.writeBytes(Long.toString(prevOutLength)+" ");
-    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
-        .length() - prevOutLength)+"\n"+LogName.STDERR+":");
-    dos.writeBytes(Long.toString(prevErrLength)+" ");
-    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
-        .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":");
-    dos.writeBytes(Long.toString(prevLogLength)+" ");
-    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
-        .length() - prevLogLength)+"\n");
+    dos.writeBytes(LogFileDetail.LOCATION
+        + firstTaskid.toString()
+        + "\n");
+    for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
+      Long[] lens = lengths.get(logName);
+      dos.writeBytes(logName.toString() + ":"
+          + lens[0].toString() + " "
+          + Long.toString(lens[1].longValue() - lens[0].longValue())
+          + "\n");}
     dos.close();
 
     File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
@@ -194,19 +235,13 @@ public class TaskLog {
     }
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
-  private static void resetPrevLengths(TaskAttemptID firstTaskid) {
-    prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
-    prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
-    prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
-  }
-  private volatile static TaskAttemptID currentTaskid = null;
 
   public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
                                            TaskAttemptID taskid) 
   throws IOException {
     syncLogs(firstTaskid, taskid, false);
   }
-  
+
   @SuppressWarnings("unchecked")
   public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
                                            TaskAttemptID taskid,
@@ -225,11 +260,25 @@ public class TaskLog {
         }
       }
     }
+    // set start and end
+    for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
+      if (currentTaskid != taskid) {
+        // Set start = current-end
+        logLengths.get(logName)[0] =
+            Long.valueOf(getTaskLogFile(firstTaskid, logName).length());
+      }
+      // Set current end
+      logLengths.get(logName)[1] =
+          Long.valueOf(getTaskLogFile(firstTaskid, logName).length());
+    }
     if (currentTaskid != taskid) {
+      if (currentTaskid != null) {
+        LOG.info("Starting logging for a new task " + taskid
+            + " in the same JVM as that of the first task " + firstTaskid);
+      }
       currentTaskid = taskid;
-      resetPrevLengths(firstTaskid);
     }
-    writeToIndexFile(firstTaskid, isCleanup);
+    writeToIndexFile(firstTaskid, taskid, isCleanup, logLengths);
   }
   
   /**
@@ -275,6 +324,7 @@ public class TaskLog {
       return file.lastModified() < purgeTimeStamp;
     }
   }
+
   /**
    * Purge old user logs.
    * 
@@ -319,7 +369,9 @@ public class TaskLog {
     public Reader(TaskAttemptID taskid, LogName kind, 
                   long start, long end, boolean isCleanup) throws IOException {
       // find the right log file
-      LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
+      Map<LogName, LogFileDetail> allFilesDetails =
+          getAllLogsFileDetails(taskid, isCleanup);
+      LogFileDetail fileDetail = allFilesDetails.get(kind);
       // calculate the start and stop
       long size = fileDetail.length;
       if (start < 0) {

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java?rev=1077082&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java Fri Mar  4 03:38:37 2011
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.util.StringUtils;
+
+class TaskLogsMonitor extends Thread {
+  static final Log LOG = LogFactory.getLog(TaskLogsMonitor.class);
+
+  long mapRetainSize, reduceRetainSize;
+
+  public TaskLogsMonitor(long mapRetSize, long reduceRetSize) {
+    mapRetainSize = mapRetSize;
+    reduceRetainSize = reduceRetSize;
+    LOG.info("Starting logs' monitor with mapRetainSize=" + mapRetainSize
+        + " and reduceRetainSize=" + reduceRetSize);
+  }
+
+  /**
+   * The list of tasks that have finished and so need their logs to be
+   * truncated.
+   */
+  private Map<TaskAttemptID, PerJVMInfo> finishedJVMs =
+      new HashMap<TaskAttemptID, PerJVMInfo>();
+
+  private static final int DEFAULT_BUFFER_SIZE = 4 * 1024;
+
+  static final int MINIMUM_RETAIN_SIZE_FOR_TRUNCATION = 0;
+
+  private static class PerJVMInfo {
+
+    List<Task> allAttempts;
+
+    public PerJVMInfo(List<Task> allAtmpts) {
+      this.allAttempts = allAtmpts;
+    }
+  }
+
+  /**
+   * Process(JVM/debug script) has finished. Asynchronously truncate the logs of
+   * all the corresponding tasks to the configured limit. In case of JVM, both
+   * the firstAttempt as well as the list of all attempts that ran in the same
+   * JVM have to be passed. For debug script, the (only) attempt itself should
+   * be passed as both the firstAttempt as well as the list of attempts.
+   * 
+   * @param firstAttempt
+   * @param isTaskCleanup
+   */
+  void addProcessForLogTruncation(TaskAttemptID firstAttempt,
+      List<Task> allAttempts) {
+    LOG.info("Adding the jvm with first-attempt " + firstAttempt
+        + " for logs' truncation");
+    PerJVMInfo lInfo = new PerJVMInfo(allAttempts);
+    synchronized (finishedJVMs) {
+      finishedJVMs.put(firstAttempt, lInfo);
+      finishedJVMs.notify();
+    }
+  }
+
+  /**
+   * Process the removed task's logs. This involves truncating them to
+   * retainSize.
+   */
+  void truncateLogs(TaskAttemptID firstAttempt, PerJVMInfo lInfo) {
+
+    // Read the log-file details for all the attempts that ran in this JVM
+    Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails;
+    try {
+      taskLogFileDetails = getAllLogsFileDetails(lInfo.allAttempts);
+    } catch (IOException e) {
+      LOG.warn(
+          "Exception in truncateLogs while getting allLogsFileDetails()."
+              + " Ignoring the truncation of logs of this process.", e);
+      return;
+    }
+
+    Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails =
+        new HashMap<Task, Map<LogName, LogFileDetail>>();
+
+    File attemptLogDir = TaskLog.getBaseDir(firstAttempt.toString());
+
+    FileWriter tmpFileWriter;
+    FileReader logFileReader;
+    // Now truncate file by file
+    logNameLoop: for (LogName logName : LogName.values()) {
+
+      File logFile = TaskLog.getTaskLogFile(firstAttempt, logName);
+
+      // //// Optimization: if no task is over limit, just skip truncation-code
+      if (logFile.exists()
+          && !isTruncationNeeded(lInfo, taskLogFileDetails, logName)) {
+        LOG.debug("Truncation is not needed for "
+            + logFile.getAbsolutePath());
+        continue;
+      }
+      // //// End of optimization
+
+      // Truncation is needed for this log-file. Go ahead now.
+      File tmpFile = new File(attemptLogDir, "truncate.tmp");
+      try {
+        tmpFileWriter = new FileWriter(tmpFile);
+      } catch (IOException ioe) {
+        LOG.warn("Cannot open " + tmpFile.getAbsolutePath()
+            + " for writing truncated log-file "
+            + logFile.getAbsolutePath()
+            + ". Continuing with other log files. ", ioe);
+        continue;
+      }
+
+      try {
+        logFileReader = new FileReader(logFile);
+      } catch (FileNotFoundException fe) {
+        LOG.warn("Cannot open " + logFile.getAbsolutePath()
+            + " for reading. Continuing with other log files");
+        if (!tmpFile.delete()) {
+          LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+        }
+        continue;
+      }
+
+      long newCurrentOffset = 0;
+      // Process each attempt from the ordered list passed.
+      for (Task task : lInfo.allAttempts) {
+
+        // Truncate the log files of this task-attempt so that only the last
+        // retainSize many bytes of this log file is retained and the log
+        // file is reduced in size saving disk space.
+        long retainSize =
+            (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+        LogFileDetail newLogFileDetail = new LogFileDetail();
+        try {
+          newLogFileDetail =
+              truncateALogFileOfAnAttempt(task.getTaskID(),
+                  taskLogFileDetails.get(task).get(logName), retainSize,
+                  tmpFileWriter, logFileReader);
+        } catch (IOException ioe) {
+          LOG.warn("Cannot truncate the log file "
+              + logFile.getAbsolutePath()
+              + ". Caught exception while handling " + task.getTaskID(),
+              ioe);
+          // revert back updatedTaskLogFileDetails
+          revertIndexFileInfo(lInfo, taskLogFileDetails,
+              updatedTaskLogFileDetails, logName);
+          if (!tmpFile.delete()) {
+            LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+          }
+          continue logNameLoop;
+        }
+
+        // Track information for updating the index file properly.
+        // Index files don't track DEBUGOUT and PROFILE logs, so skip'em.
+        if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+          if (!updatedTaskLogFileDetails.containsKey(task)) {
+            updatedTaskLogFileDetails.put(task,
+                new HashMap<LogName, LogFileDetail>());
+          }
+          // newLogFileDetail already has the location and length set, just
+          // set the start offset now.
+          newLogFileDetail.start = newCurrentOffset;
+          updatedTaskLogFileDetails.get(task).put(logName, newLogFileDetail);
+          newCurrentOffset += newLogFileDetail.length;
+        }
+      }
+
+      try {
+        tmpFileWriter.close();
+      } catch (IOException ioe) {
+        LOG.warn("Couldn't close the tmp file " + tmpFile.getAbsolutePath()
+            + ". Deleting it.", ioe);
+        revertIndexFileInfo(lInfo, taskLogFileDetails,
+            updatedTaskLogFileDetails, logName);
+        if (!tmpFile.delete()) {
+          LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+        }
+        continue;
+      }
+
+      if (!tmpFile.renameTo(logFile)) {
+        // If the tmpFile cannot be renamed revert back
+        // updatedTaskLogFileDetails to maintain the consistency of the
+        // original log file
+        revertIndexFileInfo(lInfo, taskLogFileDetails,
+            updatedTaskLogFileDetails, logName);
+        if (!tmpFile.delete()) {
+          LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
+        }
+      }
+    }
+
+    // Update the index files
+    updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
+  }
+
+  /**
+   * @param lInfo
+   * @param taskLogFileDetails
+   * @param updatedTaskLogFileDetails
+   * @param logName
+   */
+  private void revertIndexFileInfo(PerJVMInfo lInfo,
+      Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+      Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails,
+      LogName logName) {
+    if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
+      for (Task task : lInfo.allAttempts) {
+        if (!updatedTaskLogFileDetails.containsKey(task)) {
+          updatedTaskLogFileDetails.put(task,
+              new HashMap<LogName, LogFileDetail>());
+        }
+        updatedTaskLogFileDetails.get(task).put(logName,
+            taskLogFileDetails.get(task).get(logName));
+      }
+    }
+  }
+
+  /**
+   * Get the logFileDetails of all the list of attempts passed.
+   * 
+   * @param lInfo
+   * @return a map of task to the log-file detail
+   * @throws IOException
+   */
+  private Map<Task, Map<LogName, LogFileDetail>> getAllLogsFileDetails(
+      final List<Task> allAttempts) throws IOException {
+    Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails =
+        new HashMap<Task, Map<LogName, LogFileDetail>>();
+    for (Task task : allAttempts) {
+      Map<LogName, LogFileDetail> allLogsFileDetails;
+      allLogsFileDetails =
+          TaskLog.getAllLogsFileDetails(task.getTaskID(),
+              task.isTaskCleanupTask());
+      taskLogFileDetails.put(task, allLogsFileDetails);
+    }
+    return taskLogFileDetails;
+  }
+
+  /**
+   * Check if truncation of logs is needed for the given jvmInfo. If all the
+   * tasks that ran in a JVM are within the log-limits, then truncation is not
+   * needed. Otherwise it is needed.
+   * 
+   * @param lInfo
+   * @param taskLogFileDetails
+   * @param logName
+   * @return true if truncation is needed, false otherwise
+   */
+  private boolean isTruncationNeeded(PerJVMInfo lInfo,
+      Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
+      LogName logName) {
+    boolean truncationNeeded = false;
+    LogFileDetail logFileDetail = null;
+    for (Task task : lInfo.allAttempts) {
+      long taskRetainSize =
+          (task.isMapTask() ? mapRetainSize : reduceRetainSize);
+      Map<LogName, LogFileDetail> allLogsFileDetails =
+          taskLogFileDetails.get(task);
+      logFileDetail = allLogsFileDetails.get(logName);
+      if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+          && logFileDetail.length > taskRetainSize) {
+        truncationNeeded = true;
+        break;
+      }
+    }
+    return truncationNeeded;
+  }
+
+  /**
+   * Truncate the log file of this task-attempt so that only the last retainSize
+   * many bytes of each log file is retained and the log file is reduced in size
+   * saving disk space.
+   * 
+   * @param taskID Task whose logs need to be truncated
+   * @param oldLogFileDetail contains the original log details for the attempt
+   * @param taskRetainSize retain-size
+   * @param tmpFileWriter New log file to write to. Already opened in append
+   *          mode.
+   * @param logFileReader Original log file to read from.
+   * @return
+   * @throws IOException
+   */
+  private LogFileDetail truncateALogFileOfAnAttempt(
+      final TaskAttemptID taskID, final LogFileDetail oldLogFileDetail,
+      final long taskRetainSize, final FileWriter tmpFileWriter,
+      final FileReader logFileReader) throws IOException {
+    LogFileDetail newLogFileDetail = new LogFileDetail();
+
+    // ///////////// Truncate log file ///////////////////////
+
+    // New location of log file is same as the old
+    newLogFileDetail.location = oldLogFileDetail.location;
+    if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
+        && oldLogFileDetail.length > taskRetainSize) {
+      LOG.info("Truncating logs for " + taskID + " from "
+          + oldLogFileDetail.length + "bytes to " + taskRetainSize
+          + "bytes.");
+      newLogFileDetail.length = taskRetainSize;
+    } else {
+      LOG.info("No truncation needed for " + taskID + " length is "
+          + oldLogFileDetail.length + " retain size " + taskRetainSize
+          + "bytes.");
+      newLogFileDetail.length = oldLogFileDetail.length;
+    }
+    long charsSkipped =
+        logFileReader.skip(oldLogFileDetail.length
+            - newLogFileDetail.length);
+    if (charsSkipped != oldLogFileDetail.length - newLogFileDetail.length) {
+      throw new IOException("Erroneously skipped " + charsSkipped
+          + " instead of the expected "
+          + (oldLogFileDetail.length - newLogFileDetail.length));
+    }
+    long alreadyRead = 0;
+    while (alreadyRead < newLogFileDetail.length) {
+      char tmpBuf[]; // Temporary buffer to read logs
+      if (newLogFileDetail.length - alreadyRead >= DEFAULT_BUFFER_SIZE) {
+        tmpBuf = new char[DEFAULT_BUFFER_SIZE];
+      } else {
+        tmpBuf = new char[(int) (newLogFileDetail.length - alreadyRead)];
+      }
+      int bytesRead = logFileReader.read(tmpBuf);
+      if (bytesRead < 0) {
+        break;
+      } else {
+        alreadyRead += bytesRead;
+      }
+      tmpFileWriter.write(tmpBuf);
+    }
+    // ////// End of truncating log file ///////////////////////
+
+    return newLogFileDetail;
+  }
+
+  /**
+   * Truncation of logs is done. Now sync the index files to reflect the
+   * truncated sizes.
+   * 
+   * @param firstAttempt
+   * @param updatedTaskLogFileDetails
+   */
+  private void updateIndicesAfterLogTruncation(TaskAttemptID firstAttempt,
+      Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) {
+    for (Entry<Task, Map<LogName, LogFileDetail>> entry : 
+                                updatedTaskLogFileDetails.entrySet()) {
+      Task task = entry.getKey();
+      Map<LogName, LogFileDetail> logFileDetails = entry.getValue();
+      Map<LogName, Long[]> logLengths = new HashMap<LogName, Long[]>();
+      // set current and previous lengths
+      for (LogName logName : TaskLog.LOGS_TRACKED_BY_INDEX_FILES) {
+        logLengths.put(logName, new Long[] { Long.valueOf(0L),
+            Long.valueOf(0L) });
+        LogFileDetail lfd = logFileDetails.get(logName);
+        if (lfd != null) {
+          // Set previous lengths
+          logLengths.get(logName)[0] = Long.valueOf(lfd.start);
+          // Set current lengths
+          logLengths.get(logName)[1] = Long.valueOf(lfd.start + lfd.length);
+        }
+      }
+      try {
+        TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(),
+            task.isTaskCleanupTask(), logLengths);
+      } catch (IOException ioe) {
+        LOG.warn("Exception in updateIndicesAfterLogTruncation : "
+            + StringUtils.stringifyException(ioe));
+        LOG.warn("Exception encountered while updating index file of task "
+            + task.getTaskID()
+            + ". Ignoring and continuing with other tasks.");
+      }
+    }
+  }
+
+  /**
+   * 
+   * @throws IOException
+   */
+  void monitorTaskLogs() throws IOException {
+
+    Map<TaskAttemptID, PerJVMInfo> tasksBeingTruncated =
+      new HashMap<TaskAttemptID, PerJVMInfo>();
+
+    // Start monitoring newly added finishedJVMs
+    synchronized (finishedJVMs) {
+      tasksBeingTruncated.clear();
+      tasksBeingTruncated.putAll(finishedJVMs);
+      finishedJVMs.clear();
+    }
+
+    for (Entry<TaskAttemptID, PerJVMInfo> entry : 
+                tasksBeingTruncated.entrySet()) {
+      truncateLogs(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void run() {
+
+    while (true) {
+      try {
+        monitorTaskLogs();
+        try {
+          synchronized (finishedJVMs) {
+            while (finishedJVMs.isEmpty()) {
+              finishedJVMs.wait();
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.warn(getName() + " is interrupted. Returning");
+          return;
+        }
+      } catch (Throwable e) {
+        LOG.warn(getName()
+            + " encountered an exception while monitoring : "
+            + StringUtils.stringifyException(e));
+        LOG.info("Ingoring the exception and continuing monitoring.");
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077082&r1=1077081&r2=1077082&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:38:37 2011
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -65,6 +66,8 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -112,7 +115,12 @@ public class TaskTracker 
   @Deprecated
   static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
      "mapred.tasktracker.pmem.reserved";
- 
+
+  static final String MAP_USERLOG_RETAIN_SIZE =
+      "mapreduce.cluster.map.userlog.retain-size";
+  static final String REDUCE_USERLOG_RETAIN_SIZE =
+      "mapreduce.cluster.reduce.userlog.retain-size";
+
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
 
@@ -233,6 +241,8 @@ public class TaskTracker 
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
+  private TaskLogsMonitor taskLogsMonitor;
+
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
 
@@ -402,6 +412,14 @@ public class TaskTracker 
     }
   }
 
+  TaskLogsMonitor getTaskLogsMonitor() {
+    return this.taskLogsMonitor;
+  }
+
+  void setTaskLogsMonitor(TaskLogsMonitor t) {
+    this.taskLogsMonitor = t;
+  }
+
   static String getCacheSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
   }
@@ -561,6 +579,10 @@ public class TaskTracker 
 
     initializeMemoryManagement();
 
+    setTaskLogsMonitor(new TaskLogsMonitor(getMapUserLogRetainSize(),
+        getReduceUserLogRetainSize()));
+    getTaskLogsMonitor().start();
+
     this.indexCache = new IndexCache(this.fConf);
 
     mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
@@ -924,7 +946,11 @@ public class TaskTracker 
     //stop the launchers
     this.mapLauncher.interrupt();
     this.reduceLauncher.interrupt();
-    
+
+    // All tasks are killed. So, they are removed from TaskLog monitoring also.
+    // Interrupt the monitor.
+    getTaskLogsMonitor().interrupt();
+
     jvmManager.stop();
     
     // shutdown RPC connections
@@ -991,6 +1017,13 @@ public class TaskTracker 
     initialize();
   }
 
+  /**
+   * Blank constructor. Only usable by tests.
+   */
+  TaskTracker() {
+    server = null;
+  }
+
   private void checkJettyPort(int port) throws IOException { 
     //See HADOOP-4744
     if (port < 0) {
@@ -1331,6 +1364,22 @@ public class TaskTracker 
     return heartbeatResponse;
   }
 
+  long getMapUserLogRetainSize() {
+    return fConf.getLong(MAP_USERLOG_RETAIN_SIZE, -1);
+  }
+
+  void setMapUserLogRetainSize(long retainSize) {
+    fConf.setLong(MAP_USERLOG_RETAIN_SIZE, retainSize);
+  }
+
+  long getReduceUserLogRetainSize() {
+    return fConf.getLong(REDUCE_USERLOG_RETAIN_SIZE, -1);
+  }
+
+  void setReduceUserLogRetainSize(long retainSize) {
+    fConf.setLong(REDUCE_USERLOG_RETAIN_SIZE, retainSize);
+  }
+
   /**
    * Return the total virtual memory available on this TaskTracker.
    * @return total size of virtual memory.
@@ -2252,18 +2301,23 @@ public class TaskTracker 
               String taskSyslog ="";
               String jobConf = task.getJobFile();
               try {
-                // get task's stdout file 
-                taskStdout = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.STDOUT));
-                // get task's stderr file 
-                taskStderr = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.STDERR));
-                // get task's syslog file 
-                taskSyslog = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.SYSLOG));
+                Map<LogName, LogFileDetail> allFilesDetails =
+                    TaskLog.getAllLogsFileDetails(task.getTaskID(), false);
+                // get task's stdout file
+                taskStdout =
+                    TaskLog.getRealTaskLogFilePath(
+                        allFilesDetails.get(LogName.STDOUT).location,
+                        LogName.STDOUT);
+                // get task's stderr file
+                taskStderr =
+                    TaskLog.getRealTaskLogFilePath(
+                        allFilesDetails.get(LogName.STDERR).location,
+                        LogName.STDERR);
+                // get task's syslog file
+                taskSyslog =
+                    TaskLog.getRealTaskLogFilePath(
+                        allFilesDetails.get(LogName.SYSLOG).location,
+                        LogName.SYSLOG);
               } catch(IOException e){
                 LOG.warn("Exception finding task's stdout/err/syslog files");
               }
@@ -2325,6 +2379,11 @@ public class TaskTracker 
               } catch(IOException ioe) {
                 LOG.warn("Exception in add diagnostics!");
               }
+
+              // Debug-command is run. Do the post-debug-script-exit debug-logs
+              // processing. Truncate the logs.
+              getTaskLogsMonitor().addProcessForLogTruncation(
+                  task.getTaskID(), Arrays.asList(task));
             }
           }
           taskStatus.setProgress(0.0f);

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java?rev=1077082&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java Fri Mar  4 03:38:37 2011
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Verify the logs' monitoring functionality.
+ */
+public class TestTaskLogsMonitor {
+
+  static final Log LOG = LogFactory.getLog(TestTaskLogsMonitor.class);
+
+  /**
+   * clean-up any stale directories after enabling writable permissions for all
+   * attempt-dirs.
+   * 
+   * @throws IOException
+   */
+  @After
+  public void tearDown() throws IOException {
+    File logDir = TaskLog.getUserLogDir();
+    for (File attemptDir : logDir.listFiles()) {
+      attemptDir.setWritable(true);
+      FileUtil.fullyDelete(attemptDir);
+    }
+  }
+
+  void writeRealBytes(TaskAttemptID firstAttemptID,
+      TaskAttemptID attemptID, LogName logName, long numBytes, char data)
+      throws IOException {
+
+    File logFile = TaskLog.getTaskLogFile(firstAttemptID, logName);
+
+    LOG.info("Going to write " + numBytes + " real bytes to the log file "
+        + logFile);
+
+    if (!logFile.getParentFile().exists()
+        && !logFile.getParentFile().mkdirs()) {
+      throw new IOException("Couldn't create all ancestor dirs for "
+          + logFile);
+    }
+
+    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    if (!attemptDir.exists() && !attemptDir.mkdirs()) {
+      throw new IOException("Couldn't create all ancestor dirs for "
+          + logFile);
+    }
+
+    // Need to call up front to set currenttaskid.
+    TaskLog.syncLogs(firstAttemptID, attemptID);
+
+    FileWriter writer = new FileWriter(logFile, true);
+    for (long i = 0; i < numBytes; i++) {
+      writer.write(data);
+    }
+    writer.close();
+    TaskLog.syncLogs(firstAttemptID, attemptID);
+    LOG.info("Written " + numBytes + " real bytes to the log file "
+        + logFile);
+  }
+
+  private static Map<LogName, Long> getAllLogsFileLengths(
+      TaskAttemptID tid, boolean isCleanup) throws IOException {
+    Map<LogName, Long> allLogsFileLengths = new HashMap<LogName, Long>();
+
+    // If the index file doesn't exist, we cannot get log-file lengths. So set
+    // them to zero.
+    if (!TaskLog.getIndexFile(tid.toString(), isCleanup).exists()) {
+      for (LogName log : LogName.values()) {
+        allLogsFileLengths.put(log, Long.valueOf(0));
+      }
+      return allLogsFileLengths;
+    }
+
+    Map<LogName, LogFileDetail> logFilesDetails =
+        TaskLog.getAllLogsFileDetails(tid, isCleanup);
+    for (LogName log : logFilesDetails.keySet()) {
+      allLogsFileLengths.put(log,
+          Long.valueOf(logFilesDetails.get(log).length));
+    }
+    return allLogsFileLengths;
+  }
+
+  /**
+   * Test cases which don't need any truncation of log-files. Without JVM-reuse.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testNoTruncationNeeded() throws IOException {
+    TaskTracker taskTracker = new TaskTracker();
+    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
+    taskTracker.setTaskLogsMonitor(logsMonitor);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+
+    // Let the tasks write logs within retain-size
+    writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 500, 'H');
+
+    logsMonitor.monitorTaskLogs();
+    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Finish the task and the JVM too.
+    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+
+    // There should be no truncation of the log-file.
+    logsMonitor.monitorTaskLogs();
+    assertTrue(attemptDir.exists());
+    File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+    assertEquals(500, logFile.length());
+    // The index file should also be proper.
+    assertEquals(500, getAllLogsFileLengths(attemptID, false).get(
+        LogName.SYSLOG).longValue());
+
+    logsMonitor.monitorTaskLogs();
+    assertEquals(500, logFile.length());
+  }
+
+  /**
+   * Test the disabling of truncation of log-file.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testDisabledLogTruncation() throws IOException {
+    TaskTracker taskTracker = new TaskTracker();
+    // Anything less than 0 disables the truncation.
+    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(-1L, -1L);
+    taskTracker.setTaskLogsMonitor(logsMonitor);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+
+    // Let the tasks write some logs
+    writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+
+    logsMonitor.monitorTaskLogs();
+    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Finish the task and the JVM too.
+    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+
+    // The log-file should not be truncated.
+    logsMonitor.monitorTaskLogs();
+    assertTrue(attemptDir.exists());
+    File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+    assertEquals(1500, logFile.length());
+    // The index file should also be proper.
+    assertEquals(1500, getAllLogsFileLengths(attemptID, false).get(
+        LogName.SYSLOG).longValue());
+  }
+
+  /**
+   * Test the truncation of log-file when JVMs are not reused.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogTruncationOnFinishing() throws IOException {
+    TaskTracker taskTracker = new TaskTracker();
+    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
+    taskTracker.setTaskLogsMonitor(logsMonitor);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+
+    // Let the tasks write logs more than retain-size
+    writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+
+    logsMonitor.monitorTaskLogs();
+    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Finish the task and the JVM too.
+    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+
+    // The log-file should now be truncated.
+    logsMonitor.monitorTaskLogs();
+    assertTrue(attemptDir.exists());
+    File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
+    assertEquals(1000, logFile.length());
+    // The index file should also be proper.
+    assertEquals(1000, getAllLogsFileLengths(attemptID, false).get(
+        LogName.SYSLOG).longValue());
+
+    logsMonitor.monitorTaskLogs();
+    assertEquals(1000, logFile.length());
+  }
+
+  /**
+   * Test the truncation of log-file when JVM-reuse is enabled.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogTruncationOnFinishingWithJVMReuse() throws IOException {
+    TaskTracker taskTracker = new TaskTracker();
+    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(150L, 150L);
+    taskTracker.setTaskLogsMonitor(logsMonitor);
+
+    TaskID baseTaskID = new TaskID();
+    int attemptsCount = 0;
+
+    // Assuming the job's retain size is 150
+    TaskAttemptID attempt1 = new TaskAttemptID(baseTaskID, attemptsCount++);
+    Task task1 = new MapTask(null, attempt1, 0, null, null, 0, null);
+
+    // Let the tasks write logs more than retain-size
+    writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
+
+    logsMonitor.monitorTaskLogs();
+
+    File attemptDir = TaskLog.getBaseDir(attempt1.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Start another attempt in the same JVM
+    TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
+    Task task2 = new MapTask(null, attempt2, 0, null, null, 0, null);
+    logsMonitor.monitorTaskLogs();
+
+    // Let attempt2 also write some logs
+    writeRealBytes(attempt1, attempt2, LogName.SYSLOG, 100, 'B');
+    logsMonitor.monitorTaskLogs();
+
+    // Start yet another attempt in the same JVM
+    TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
+    Task task3 = new MapTask(null, attempt3, 0, null, null, 0, null);
+    logsMonitor.monitorTaskLogs();
+
+    // Let attempt3 also write some logs
+    writeRealBytes(attempt1, attempt3, LogName.SYSLOG, 225, 'C');
+    logsMonitor.monitorTaskLogs();
+
+    // Finish the JVM.
+    logsMonitor.addProcessForLogTruncation(attempt1,
+        Arrays.asList((new Task[] { task1, task2, task3 })));
+
+    // The log-file should now be truncated.
+    logsMonitor.monitorTaskLogs();
+    assertTrue(attemptDir.exists());
+    File logFile = TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG);
+    assertEquals(400, logFile.length());
+    // The index files should also be proper.
+    assertEquals(150, getAllLogsFileLengths(attempt1, false).get(
+        LogName.SYSLOG).longValue());
+    assertEquals(100, getAllLogsFileLengths(attempt2, false).get(
+        LogName.SYSLOG).longValue());
+    assertEquals(150, getAllLogsFileLengths(attempt3, false).get(
+        LogName.SYSLOG).longValue());
+
+    // assert the data.
+    FileReader reader =
+        new FileReader(TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG));
+    int ch, bytesRead = 0;
+    boolean dataValid = true;
+    while ((ch = reader.read()) != -1) {
+      bytesRead++;
+      if (bytesRead <= 150) {
+        if ((char) ch != 'A') {
+          LOG.warn("Truncation didn't happen properly. At "
+              + (bytesRead + 1) + "th byte, expected 'A' but found "
+              + (char) ch);
+          dataValid = false;
+        }
+      } else if (bytesRead <= 250) {
+        if ((char) ch != 'B') {
+          LOG.warn("Truncation didn't happen properly. At "
+              + (bytesRead + 1) + "th byte, expected 'B' but found "
+              + (char) ch);
+          dataValid = false;
+        }
+      } else if ((char) ch != 'C') {
+        LOG.warn("Truncation didn't happen properly. At " + (bytesRead + 1)
+            + "th byte, expected 'C' but found " + (char) ch);
+        dataValid = false;
+      }
+    }
+    assertTrue("Log-truncation didn't happen properly!", dataValid);
+
+    logsMonitor.monitorTaskLogs();
+    assertEquals(400, logFile.length());
+  }
+
+  private static String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(
+          ' ', '+');
+
+  public static class LoggingMapper<K, V> extends IdentityMapper<K, V> {
+
+    public void map(K key, V val, OutputCollector<K, V> output,
+        Reporter reporter) throws IOException {
+      // Write lots of logs
+      for (int i = 0; i < 1000; i++) {
+        System.out.println("Lots of logs! Lots of logs! "
+            + "Waiting to be truncated! Lots of logs!");
+      }
+      super.map(key, val, output, reporter);
+    }
+  }
+
+  /**
+   * Test logs monitoring with {@link MiniMRCluster}
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogsMonitoringWithMiniMR() throws IOException {
+
+    MiniMRCluster mr = null;
+    try {
+      JobConf clusterConf = new JobConf();
+      clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+      mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+      JobConf conf = mr.createJobConf();
+
+      Path inDir = new Path(TEST_ROOT_DIR + "/input");
+      Path outDir = new Path(TEST_ROOT_DIR + "/output");
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(outDir)) {
+        fs.delete(outDir, true);
+      }
+      if (!fs.exists(inDir)) {
+        fs.mkdirs(inDir);
+      }
+      String input = "The quick brown fox jumped over the lazy dog";
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+
+      conf.setInputFormat(TextInputFormat.class);
+      conf.setOutputKeyClass(LongWritable.class);
+      conf.setOutputValueClass(Text.class);
+
+      FileInputFormat.setInputPaths(conf, inDir);
+      FileOutputFormat.setOutputPath(conf, outDir);
+      conf.setNumMapTasks(1);
+      conf.setNumReduceTasks(0);
+      conf.setMapperClass(LoggingMapper.class);
+
+      RunningJob job = JobClient.runJob(conf);
+      assertTrue(job.getJobState() == JobStatus.SUCCEEDED);
+      for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+        long length =
+            TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+                TaskLog.LogName.STDOUT).length();
+        assertTrue("STDOUT log file length for " + tce.getTaskAttemptId()
+            + " is " + length + " and not <=10000", length <= 10000);
+      }
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test the truncation of DEBUGOUT file by {@link TaskLogsMonitor}
+   * @throws IOException 
+   */
+  @Test
+  public void testDebugLogsTruncationWithMiniMR() throws IOException {
+
+    MiniMRCluster mr = null;
+    try {
+      JobConf clusterConf = new JobConf();
+      clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
+      clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
+      mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
+
+      JobConf conf = mr.createJobConf();
+
+      Path inDir = new Path(TEST_ROOT_DIR + "/input");
+      Path outDir = new Path(TEST_ROOT_DIR + "/output");
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(outDir)) {
+        fs.delete(outDir, true);
+      }
+      if (!fs.exists(inDir)) {
+        fs.mkdirs(inDir);
+      }
+      String input = "The quick brown fox jumped over the lazy dog";
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+
+      conf.setInputFormat(TextInputFormat.class);
+      conf.setOutputKeyClass(LongWritable.class);
+      conf.setOutputValueClass(Text.class);
+
+      FileInputFormat.setInputPaths(conf, inDir);
+      FileOutputFormat.setOutputPath(conf, outDir);
+      conf.setNumMapTasks(1);
+      conf.setMaxMapAttempts(1);
+      conf.setNumReduceTasks(0);
+      conf.setMapperClass(TestMiniMRMapRedDebugScript.MapClass.class);
+
+      // copy debug script to cache from local file system.
+      Path scriptPath = new Path(TEST_ROOT_DIR, "debug-script.txt");
+      String debugScriptContent =
+          "for ((i=0;i<1000;i++)); " + "do "
+              + "echo \"Lots of logs! Lots of logs! "
+              + "Waiting to be truncated! Lots of logs!\";" + "done";
+      DataOutputStream scriptFile = fs.create(scriptPath);
+      scriptFile.writeBytes(debugScriptContent);
+      scriptFile.close();
+      new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+      URI uri = scriptPath.toUri();
+      DistributedCache.createSymlink(conf);
+      DistributedCache.addCacheFile(uri, conf);
+      conf.setMapDebugScript(scriptPath.toUri().getPath());
+
+      RunningJob job = null;
+      try {
+        JobClient jc = new JobClient(conf);
+        job = jc.submitJob(conf);
+        try {
+          jc.monitorAndPrintJob(conf, job);
+        } catch (InterruptedException e) {
+          //
+        }
+      } catch (IOException ioe) {
+      } finally{
+        for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
+          File debugOutFile =
+              TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
+                  TaskLog.LogName.DEBUGOUT);
+          if (debugOutFile.exists()) {
+            long length = debugOutFile.length();
+            assertTrue("DEBUGOUT log file length for "
+                + tce.getTaskAttemptId() + " is " + length
+                + " and not =10000", length == 10000);
+          }
+        }
+      }
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+}