You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/08/07 00:05:41 UTC

svn commit: r563300 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/pipes/ src/webapps/job/ src/webapps/task/

Author: omalley
Date: Mon Aug  6 15:05:40 2007
New Revision: 563300

URL: http://svn.apache.org/viewvc?view=rev&rev=563300
Log:
HADOOP-1553.  Simplify and speed up saving of user task's output and logs. 
(omalley)

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
Removed:
    lucene/hadoop/trunk/src/webapps/task/tasklog.jsp
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/conf/log4j.properties
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
    lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
    lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Aug  6 15:05:40 2007
@@ -461,6 +461,27 @@
 142. HADOOP-1657.  Fix NNBench to ensure that the block size is a
      multiple of bytes.per.checksum. (Raghu Angadi via dhruba)
 
+143. HADOOP-1553.  Replace user task output and log capture code to use shell
+     redirection instead of copier threads in the TaskTracker. Capping the
+     size of the output is now done via tail in memory and thus should not be large.
+     The output of the tasklog servlet is not forced into UTF8 and is not 
+     buffered entirely in memory. (omalley)
+     Configuration changes to hadoop-default.xml:
+       remove mapred.userlog.num.splits
+       remove mapred.userlog.purge.splits
+       change default mapred.userlog.limit.kb to 0 (no limit)
+       change default mapred.userlog.retain.hours to 24
+     Configuration changes to log4j.properties:
+       remove log4j.appender.TLA.noKeepSplits
+       remove log4j.appender.TLA.purgeLogSplits
+       remove log4j.appender.TLA.logsRetainHours
+     URL changes:
+       http://<tasktracker>/tasklog.jsp -> http://<tasktracker>tasklog with
+         parameters limited to start and end, which may be positive (from
+         start) or negative (from end).
+     Environment:
+       require bash (v2 or later) and tail
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Mon Aug  6 15:05:40 2007
@@ -779,29 +779,15 @@
 </property>
 
 <property>
-  <name>mapred.userlog.num.splits</name>
-  <value>4</value>
-  <description>The number of fragments into which the user-log is to be split.
-  </description>
-</property>
-
-<property>
   <name>mapred.userlog.limit.kb</name>
-  <value>100</value>
-  <description>The maximum size of user-logs of each task.
-  </description>
-</property>
-
-<property>
-  <name>mapred.userlog.purgesplits</name>
-  <value>true</value>
-  <description>Should the splits be purged disregarding the user-log size limit.
+  <value>0</value>
+  <description>The maximum size of user-logs of each task in KB. 0 disables the cap.
   </description>
 </property>
 
 <property>
   <name>mapred.userlog.retain.hours</name>
-  <value>12</value>
+  <value>24</value>
   <description>The maximum time, in hours, for which the user-logs are to be 
   				retained.
   </description>

Modified: lucene/hadoop/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/log4j.properties?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/conf/log4j.properties (original)
+++ lucene/hadoop/trunk/conf/log4j.properties Mon Aug  6 15:05:40 2007
@@ -52,10 +52,7 @@
 
 log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
 log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
-log4j.appender.TLA.noKeepSplits=${hadoop.tasklog.noKeepSplits}
 log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
-log4j.appender.TLA.purgeLogSplits=${hadoop.tasklog.purgeLogSplits}
-log4j.appender.TLA.logsRetainHours=${hadoop.tasklog.logsRetainHours}
 
 log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
 log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Mon Aug  6 15:05:40 2007
@@ -304,6 +304,46 @@
   }
   
   /**
+   * This class is only used on windows to invoke the cygpath command.
+   */
+  private static class CygPathCommand extends Command {
+    String[] command;
+    String result;
+    CygPathCommand(String path) throws IOException {
+      command = new String[]{"cygpath", "-u", path};
+      run();
+    }
+    String getResult() throws IOException {
+      return result;
+    }
+    protected String[] getExecString() {
+      return command;
+    }
+    protected void parseExecResult(BufferedReader lines) throws IOException {
+      String line = lines.readLine();
+      if (line == null) {
+        throw new IOException("Can't convert '" + command[2] + 
+                              " to a cygwin path");
+      }
+      result = line;
+    }
+  }
+
+  /**
+   * Convert a os-native filename to a path that works for the shell.
+   * @param file The filename to convert
+   * @return The unix pathname
+   * @throws IOException on windows, there can be problems with the subprocess
+   */
+  public static String makeShellPath(File file) throws IOException {
+    if (Path.WINDOWS) {
+      return new CygPathCommand(file.toString()).getResult();
+    } else {
+      return file.toString();
+    }
+  }
+
+  /**
    * Takes an input dir and returns the du on that local directory. Very basic
    * implementation.
    * 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Aug  6 15:05:40 2007
@@ -375,8 +375,8 @@
       String httpTaskLogLocation = null; 
       if (null != ttStatus){
         httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" + 
-          ttStatus.getHttpPort() + "/tasklog.jsp?plaintext=true&taskid=" +
-          status.getTaskId() + "&all=true";
+          ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
+          status.getTaskId();
       }
 
       TaskCompletionEvent taskEvent = null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Mon Aug  6 15:05:40 2007
@@ -20,38 +20,37 @@
 
 import java.io.*;
 import java.util.ArrayList;
-import java.util.Vector;
+import java.util.List;
 
 import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.FileUtil;
 
 /**
  * A simple logger to handle the task-specific user logs.
  * This class uses the system property <code>hadoop.log.dir</code>.
  * 
  */
-class TaskLog {
+public class TaskLog {
   private static final Log LOG =
     LogFactory.getLog(TaskLog.class.getName());
 
   private static final File LOG_DIR = 
     new File(System.getProperty("hadoop.log.dir"), "userlogs");
   
-  private static final String SPLIT_INDEX_NAME = "split.idx";
-  
   static {
     if (!LOG_DIR.exists()) {
       LOG_DIR.mkdirs();
     }
   }
 
-  private static File getTaskLogDir(String taskid, LogFilter filter) {
-    return new File(new File(LOG_DIR, taskid), filter.getPrefix());
+  public static File getTaskLogFile(String taskid, LogName filter) {
+    return new File(new File(LOG_DIR, taskid), filter.toString());
   }
   
   /**
    * The filter for userlogs.
    */
-  public static enum LogFilter {
+  public static enum LogName {
     /** Log on the stdout of the task. */
     STDOUT ("stdout"),
 
@@ -63,500 +62,189 @@
     
     private String prefix;
     
-    private LogFilter(String prefix) {
+    private LogName(String prefix) {
       this.prefix = prefix;
     }
     
-    String getPrefix() {
+    public String toString() {
       return prefix;
     }
   }
-  
-  /**
-   * The log-writer responsible for handling writing user-logs
-   * and maintaining splits and ensuring job-specifc limits 
-   * w.r.t logs-size etc. are honoured.
-   *  
-   */
-  static class Writer {
-    private String taskId;
-    private LogFilter filter;
-
-    private final File taskLogDir;
-    private final int noKeepSplits;
-    private final long splitFileSize;
-    private final boolean purgeLogSplits;
-    private final int logsRetainHours;
-
-    private boolean initialized = false;
-    private long splitOffset = 0;
-    private long splitLength = 0;
-    private int noSplits = 0;
-    
-    private File currentSplit;            // current split filename
-    private OutputStream out;               // current split
-    private OutputStream splitIndex;        // split index file
-    
-    private int flushCtr = 0;
-    private final static int FLUSH_BYTES = 256;
-
-    /**
-     * Creates a new TaskLog writer.
-     * @param conf configuration of the task
-     * @param taskId taskid of the task
-     * @param filter the {@link LogFilter} to apply on userlogs.
-     */
-    Writer(String taskId, LogFilter filter, 
-           int noKeepSplits, long totalLogSize, boolean purgeLogSplits, int logsRetainHours) {
-      this.taskId = taskId;
-      this.filter = filter;
-      
-      this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
-      
-      this.noKeepSplits = noKeepSplits;
-      this.splitFileSize = (totalLogSize / noKeepSplits);
-      this.purgeLogSplits = purgeLogSplits;
-      this.logsRetainHours = logsRetainHours;
-    }
-    
-    private static class TaskLogsPurgeFilter implements FileFilter {
-      long purgeTimeStamp;
-    
-      TaskLogsPurgeFilter(long purgeTimeStamp) {
-        this.purgeTimeStamp = purgeTimeStamp;
-      }
 
-      public boolean accept(File file) {
-        LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
-        return file.lastModified() < purgeTimeStamp;
-      }
+  private static class TaskLogsPurgeFilter implements FileFilter {
+    long purgeTimeStamp;
+  
+    TaskLogsPurgeFilter(long purgeTimeStamp) {
+      this.purgeTimeStamp = purgeTimeStamp;
     }
 
-    private File getLogSplit(int split) {
-      String splitName = "part-" + String.format("%1$06d", split);
-      return new File(taskLogDir, splitName); 
+    public boolean accept(File file) {
+      LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
+      return file.lastModified() < purgeTimeStamp;
     }
-    
-    private void deleteDir(File dir) throws IOException {
-      File[] files = dir.listFiles();
-      if (files != null) {
-        for (int i=0; i < files.length; ++i) {
-          if (files[i].isDirectory()) {
-            deleteDir(files[i]);
-          }
-          files[i].delete();
-        }
+  }
+  /**
+   * Purge old user logs.
+   * 
+   * @throws IOException
+   */
+  public static synchronized void cleanup(int logsRetainHours
+                                          ) throws IOException {
+    // Purge logs of tasks on this tasktracker if their  
+    // mtime has exceeded "mapred.task.log.retain" hours
+    long purgeTimeStamp = System.currentTimeMillis() - 
+                            (logsRetainHours*60L*60*1000);
+    File[] oldTaskLogs = LOG_DIR.listFiles
+                           (new TaskLogsPurgeFilter(purgeTimeStamp));
+    if (oldTaskLogs != null) {
+      for (int i=0; i < oldTaskLogs.length; ++i) {
+        FileUtil.fullyDelete(oldTaskLogs[i]);
       }
-      boolean del = dir.delete();
-      LOG.debug("Deleted " + dir + ": " + del);
     }
-    
-    /**
-     * Initialize the task log-writer.
-     * 
-     * @throws IOException
-     */
-    public synchronized void init() throws IOException {
-      if (!initialized) {
-        // Purge logs of tasks on this tasktracker if their  
-        // mtime has exceeded "mapred.task.log.retain" hours
-        long purgeTimeStamp = System.currentTimeMillis() - 
-          (logsRetainHours*60*60*1000);
-        File[] oldTaskLogs = LOG_DIR.listFiles(
-                                               new TaskLogsPurgeFilter(purgeTimeStamp)
-                                               );
-        if (oldTaskLogs != null) {
-          for (int i=0; i < oldTaskLogs.length; ++i) {
-            deleteDir(oldTaskLogs[i]);
-          }
-        }
-
-        // Initialize the task's log directory
-        if (taskLogDir.exists()) {
-          deleteDir(taskLogDir);
-        }
-        taskLogDir.mkdirs();
-        
-        // Create the split index
-        splitIndex = new BufferedOutputStream(
-                                              new FileOutputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
-                                              );
+  }
 
-        out = createLogSplit(noSplits);
-        initialized = true;
-      }
-    }
-    
-    /**
-     * Write a log message to the task log.
-     * 
-     * @param b bytes to be writter
-     * @param off start offset
-     * @param len length of data
-     * @throws IOException
-     */
-    public synchronized void write(byte[] b, int off, int len) 
-      throws IOException {
-      // Check if we need to rotate the log
-      if (splitLength > splitFileSize) {
-        LOG.debug("Total no. of bytes written to split#" + noSplits + 
-                  " -> " + splitLength);
-        logRotate();
-      }
-      
-      // Periodically flush data to disk
-      if (flushCtr > FLUSH_BYTES) {
-        out.flush();
-        flushCtr = 0;
-      }
-      
-      // Write out to the log-split
-      out.write(b, off, len);
-      splitLength += len;
-      flushCtr += len;
-    }
-    
+  public static class Reader extends InputStream {
+    private long bytesRemaining;
+    private FileInputStream file;
     /**
-     * Close the task log.
-     * 
+     * Read a log file from start to end positions. The offsets may be negative,
+     * in which case they are relative to the end of the file. For example,
+     * Reader(taskid, kind, 0, -1) is the entire file and 
+     * Reader(taskid, kind, -4197, -1) is the last 4196 bytes. 
+     * @param taskid the id of the task to read the log file for
+     * @param kind the kind of log to read
+     * @param start the offset to read from (negative is relative to tail)
+     * @param end the offset to read upto (negative is relative to tail)
      * @throws IOException
      */
-    public synchronized void close() throws IOException {
-      // Close the final split
-      if (out != null) {
-        out.close();
+    public Reader(String taskid, LogName kind, 
+                  long start, long end) throws IOException {
+      // find the right log file
+      File filename = getTaskLogFile(taskid, kind);
+      // calculate the start and stop
+      long size = filename.length();
+      if (start < 0) {
+        start += size + 1;
       }
-
-      // Close the split-index
-      if (splitIndex != null) {
-        writeIndexRecord();
-        splitIndex.close();
+      if (end < 0) {
+        end += size + 1;
       }
-    }
-
-    private synchronized OutputStream createLogSplit(int split) 
-      throws IOException {
-      currentSplit =  getLogSplit(split);
-      LOG.debug("About to create the split: " + currentSplit);
-      // Record the 'split' in the index
-      writeIndexRecord();
-      return new BufferedOutputStream(new FileOutputStream(currentSplit));
-    }
-    
-    private synchronized void writeIndexRecord() throws IOException {
-      String indexRecord = currentSplit + "|" + splitOffset + "\n";
-      splitIndex.write(indexRecord.getBytes());
-      splitIndex.flush();
-    }
-    
-    private synchronized void logRotate() throws IOException {
-      // Close the current split
-      LOG.debug("About to rotate-out the split: " + noSplits);
-      out.close();
-      
-      // Re-initialize the state
-      splitOffset += splitLength;
-      splitLength = 0;
-      flushCtr = 0;
-
-      // New 'split'
-      ++noSplits;
-
-      // Check if we need to purge an old split
-      if (purgeLogSplits) {
-        if (noSplits >= noKeepSplits) {   // noSplits is zero-based
-          File purgeLogSplit = getLogSplit((noSplits-noKeepSplits));
-          purgeLogSplit.delete();
-          LOG.debug("Purged log-split #" + (noSplits-noKeepSplits) + " - " + 
-                    purgeLogSplit);
+      start = Math.max(0, Math.min(start, size));
+      end = Math.max(0, Math.min(end, size));
+      bytesRemaining = end - start;
+      file = new FileInputStream(filename);
+      // skip upto start
+      long pos = 0;
+      while (pos < start) {
+        long result = file.skip(start - pos);
+        if (result < 0) {
+          bytesRemaining = 0;
+          break;
         }
+        pos += result;
       }
-      
-      // Rotate the log
-      out = createLogSplit(noSplits); 
     }
     
-  } // TaskLog.Writer
-
-  /**
-   * The log-reader for reading the 'split' user-logs.
-   *
-   */
-  static class Reader {
-    private String taskId;
-    private LogFilter filter;
-    
-    private File taskLogDir;
-    private boolean initialized = false;
-    
-    private IndexRecord[] indexRecords = null;
-    private BufferedReader splitIndex;
-    
-    private long logFileSize = 0;
-    
-    /**
-     * Create a new task log reader.
-     * 
-     * @param taskId task id of the task.
-     * @param filter the {@link LogFilter} to apply on userlogs.
-     */
-    public Reader(String taskId, LogFilter filter) {
-      this.taskId = taskId;
-      this.filter = filter;
-      
-      this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
-    }
-
-    private static class IndexRecord {
-      String splitName;
-      long splitOffset;
-      
-      IndexRecord(String splitName, long splitOffset) {
-        this.splitName = splitName;
-        this.splitOffset = splitOffset;
+    public int read() throws IOException {
+      int result = -1;
+      if (bytesRemaining > 0) {
+        bytesRemaining -= 1;
+        result = file.read();
       }
+      return result;
     }
     
-    private synchronized void init() throws IOException {
-      this.splitIndex = new BufferedReader(new InputStreamReader(
-                                                                 new FileInputStream(new File(taskLogDir, 
-                                                                                              SPLIT_INDEX_NAME))));
-
-      // Parse the split-index and store the offsets/lengths
-      ArrayList<IndexRecord> records = new ArrayList<IndexRecord>();
-      String line;
-      while ((line = splitIndex.readLine()) != null) {
-        String[] fields = line.split("\\|");
-        if (fields.length != 2) {
-          throw new IOException("Malformed split-index with " + 
-                                fields.length + " fields");
-        }
-        
-        IndexRecord record = new IndexRecord(
-                                             fields[0], 
-                                             Long.valueOf(fields[1]).longValue()
-                                             );
-        LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + ">");
-        
-        // Save 
-        records.add(record);
-      }
-
-      indexRecords = new IndexRecord[records.size()];
-      indexRecords = records.toArray(indexRecords);
-      IndexRecord lastRecord = indexRecords[records.size() - 1];
-      logFileSize = lastRecord.splitOffset
-          + new File(lastRecord.splitName).length();
-      initialized = true;
-      LOG.debug("Log size: " + logFileSize);
-    }
-
-    /**
-     * Return the total 'logical' log-size written by the task, including
-     * purged data.
-     * 
-     * @return the total 'logical' log-size written by the task, including
-     *         purged data.
-     * @throws IOException
-     */
-    public synchronized long getTotalLogSize() throws IOException {
-      if (!initialized) {
-        init();
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+      length = (int) Math.min(length, bytesRemaining);
+      int bytes = file.read(buffer, offset, length);
+      if (bytes > 0) {
+        bytesRemaining -= bytes;
       }
-      
-      return logFileSize;
+      return bytes;
     }
     
-    /**
-     * Return the entire user-log (remaining splits).
-     * 
-     * @return Returns a <code>byte[]</code> containing the data in user-log.
-     * @throws IOException
-     */
-    public synchronized byte[] fetchAll() throws IOException {
-      if (!initialized) {
-        init();
-      }
-      
-      // Get all splits 
-      Vector<InputStream> streams = new Vector<InputStream>();
-      for (int i=0; i < indexRecords.length; ++i) {
-        InputStream stream = getLogSplit(i);
-        if (stream != null) {
-          streams.add(stream);
-          LOG.debug("Added split: " + i);
-        }
-      }
-      LOG.debug("Total log-size on disk: " + logFileSize);
-
-      // Copy log data into buffer
-      byte[] b = new byte[(int) logFileSize];
-      SequenceInputStream in = new SequenceInputStream(streams.elements());
-      try {
-        int bytesRead = 0;
-        int off = 0;
-        LOG.debug("Attempting to read " + logFileSize + " bytes from logs");
-        while ((bytesRead = in.read(b, off, (int) logFileSize - off)) > 0) {
-          LOG.debug("Got " + bytesRead + " bytes");
-          off += bytesRead;
-        }
-
-        if (off != logFileSize) {
-          LOG.debug("Didn't not read all requisite data in logs!");
-        }
-      } finally {
-        try { in.close(); } catch (IOException ex) {}
-      }
-      return b;
-    }
-    
-    /**
-     * Tail the user-log.
-     * 
-     * @param b the buffer into which the data is read.
-     * @param off the start offset in array <code>b</code>
-     *            at which the data is written.
-     * @param len the maximum number of bytes to read.
-     * @param tailSize the no. of bytes to be read from end of file.
-     * @param tailWindow the sliding window for tailing the logs.
-     * @return the total number of bytes of user-logs dataread into the buffer.
-     * @throws IOException
-     */
-    public synchronized int tail(byte[] b, int off, int len, 
-                                 long tailSize, int tailWindow) 
-      throws IOException {
-      if (!initialized) {
-        init();
-      }
-      
-      LOG.debug("tailSize: " + tailSize + " - tailWindow: " + tailWindow);
-      
-      if (tailSize*tailWindow > logFileSize) {
-        tailSize = logFileSize;
-        tailWindow = 1;
-      }
-      
-      return read(b, off, len, 
-                  (long)(logFileSize-(tailSize*tailWindow)), tailSize);
+    public int available() throws IOException {
+      return (int) Math.min(bytesRemaining, file.available());
     }
 
-    /**
-     * Read user-log data given an offset/length.
-     * 
-     * @param b the buffer into which the data is read.
-     * @param off the start offset in array <code>b</code>
-     *            at which the data is written.
-     * @param len the maximum number of bytes to read.
-     * @param logOffset the offset of the user-log from which to get data.
-     * @param logLength the maximum number of bytes of user-log data to fetch. 
-     * @return the total number of bytes of user-logs dataread into the buffer.
-     * @throws IOException
-     */
-    public synchronized int read(byte[] b, int off, int len, 
-                                 long logOffset, long logLength) 
-      throws IOException {
-      LOG.debug("TaskLog.Reader.read: logOffset: " + logOffset + " - logLength: " + logLength);
-
-      // Sanity check
-      if (logLength == 0) {
-        return 0;
-      }
-      
-      if (!initialized) {
-        init();
-      }
-      
-      // Locate the requisite splits 
-      Vector<InputStream> streams = new Vector<InputStream>();
-      long offset = logOffset;
-      int startIndex = -1, stopIndex = -1;
-      boolean inRange = false;
-      for (int i=0; i < indexRecords.length; ++i) {
-        LOG.debug("offset: " + offset + " - (split, splitOffset) : (" + 
-                  i + ", " + indexRecords[i].splitOffset + ")");
-        
-        if (offset <= indexRecords[i].splitOffset) {
-          if (!inRange) {
-            startIndex = i - ((i > 0) ? 1 : 0);
-            LOG.debug("Starting at split: " + startIndex);
-            offset += logLength;
-            InputStream stream = getLogSplit(startIndex);
-            if (stream != null) {
-              streams.add(stream);
-            }
-            LOG.debug("Added split: " + startIndex);
-            inRange = true;
-          } else {
-            stopIndex = i-1;
-            LOG.debug("Stop at split: " + stopIndex);
-            break;
-          }
-        }
-        
-        if (inRange) {
-          InputStream stream = getLogSplit(i);
-          if (stream != null) {
-            streams.add(stream);
-          }
-          LOG.debug("Added split: " + i);
-        }
-      }
-      if (startIndex == -1) {
-        throw new IOException("Illegal logOffset/logLength");
-      }
-      if (stopIndex == -1) {
-        stopIndex = indexRecords.length - 1;
-        LOG.debug("Stop at split: " + stopIndex);
-        
-        // Check if request exceeds the log-file size
-        if ((logOffset+logLength) > logFileSize) {
-          LOG.debug("logOffset+logLength exceeds log-file size");
-          logLength = logFileSize - logOffset;
-        }
-      }
-      
-      // Copy requisite data into user buffer
-      SequenceInputStream in = new SequenceInputStream(streams.elements());
-      int totalBytesRead = 0;
-      try {
-        if (streams.size() == (stopIndex - startIndex +1)) {
-          // Skip to get to 'logOffset' if logs haven't been purged
-          long skipBytes = 
-            in.skip(logOffset - indexRecords[startIndex].splitOffset);
-          LOG.debug("Skipped " + skipBytes + " bytes from " + 
-                    startIndex + " stream");
-        }
-        int bytesRead = 0;
-        len = Math.min((int)logLength, len);
-        LOG.debug("Attempting to read " + len + " bytes from logs");
-        while ((bytesRead = in.read(b, off, len)) > 0) {
-          off += bytesRead;
-          len -= bytesRead;
-        
-          totalBytesRead += bytesRead;
-        }
-      } finally {
-        try { in.close(); } catch (IOException e) {}
-      }
-
-      return totalBytesRead;
-    }
-
-    private synchronized InputStream getLogSplit(int split) 
-      throws IOException {
-      String splitName = indexRecords[split].splitName;
-      LOG.debug("About to open the split: " + splitName);
-      InputStream in = null;
-      try {
-        in = new BufferedInputStream(new FileInputStream(new File(splitName)));
-      } catch (FileNotFoundException fnfe) {
-        in = null;
-        LOG.debug("Split " + splitName + " not found... probably purged!");
-      }
-      
-      return in;
+    public void close() throws IOException {
+      file.close();
     }
+  }
 
-  } // TaskLog.Reader
+  private static final String bashCommand = "bash";
+  private static final String tailCommand = "tail";
+  
+  /**
+   * Get the desired maximum length of task's logs.
+   * @param conf the job to look in
+   * @return the number of bytes to cap the log files at
+   */
+  public static long getTaskLogLength(JobConf conf) {
+    return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+  }
 
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files.
+   * If the tailLength is 0, the entire output will be saved.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> cmd, 
+                                                File stdoutFilename,
+                                                File stderrFilename,
+                                                long tailLength
+                                               ) throws IOException {
+    String stdout = FileUtil.makeShellPath(stdoutFilename);
+    String stderr = FileUtil.makeShellPath(stderrFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    if (tailLength > 0) {
+      mergedCmd.append("(");
+    } else {
+      mergedCmd.append("exec ");
+    }
+    boolean isExecutable = true;
+    for(String s: cmd) {
+      mergedCmd.append('\'');
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the  
+        // shell to find it.
+        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false; 
+      } else {
+        mergedCmd.append(s);
+      }
+      mergedCmd.append('\'');
+      mergedCmd.append(" ");
+    }
+    mergedCmd.append(" < /dev/null ");
+    if (tailLength > 0) {
+      mergedCmd.append(" | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stderr);
+      mergedCmd.append(" ; exit $PIPESTATUS");
+    } else {
+      mergedCmd.append(" 1>> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" 2>> ");
+      mergedCmd.append(stderr);
+    }
+    result.add(mergedCmd.toString());
+    return result;
+  }
 } // TaskLog

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Mon Aug  6 15:05:40 2007
@@ -18,11 +18,10 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
 
-import org.apache.hadoop.util.StringUtils;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.spi.ErrorCode;
+import org.apache.log4j.FileAppender;
 import org.apache.log4j.spi.LoggingEvent;
 
 /**
@@ -30,66 +29,46 @@
  * map-reduce system logs.
  * 
  */
-public class TaskLogAppender extends AppenderSkeleton {
-  private TaskLog.Writer taskLogWriter = null;
+public class TaskLogAppender extends FileAppender {
   private String taskId;
-  private int noKeepSplits;
-  private long totalLogFileSize;
-  private boolean purgeLogSplits;
-  private int logsRetainHours;
+  private int maxEvents;
+  private Queue<LoggingEvent> tail = null;
 
+  @Override
   public void activateOptions() {
-    taskLogWriter = 
-      new TaskLog.Writer(taskId, TaskLog.LogFilter.SYSLOG, 
-                         noKeepSplits, totalLogFileSize, purgeLogSplits, logsRetainHours);
-    try {
-      taskLogWriter.init();
-    } catch (IOException ioe) {
-      taskLogWriter = null;
-      errorHandler.error("Failed to initialize the task's logging " +
-                         "infrastructure: " + StringUtils.stringifyException(ioe));
+    synchronized (this) {
+      if (maxEvents > 0) {
+        tail = new LinkedList<LoggingEvent>();
+      }
+      setFile(TaskLog.getTaskLogFile(taskId, 
+                                     TaskLog.LogName.SYSLOG).toString());
+      setAppend(true);
+      super.activateOptions();
     }
   }
   
-  protected synchronized void append(LoggingEvent event) {
-    if (taskLogWriter == null) {
-      errorHandler.error("Calling 'append' on uninitialize/closed logger");
-      return;
-    }
-
-    if (this.layout == null) {
-      errorHandler.error("No layout for appender " + name , 
-                         null, ErrorCode.MISSING_LAYOUT);
-      return;
-    }
-    
-    // Log the message to the task's log
-    String logMessage = this.layout.format(event);
-    try {
-      byte[] logMessageData = logMessage.getBytes();
-      taskLogWriter.write(logMessageData, 0, logMessageData.length);
-    } catch (IOException ioe) {
-      errorHandler.error("Failed to log: '" + logMessage + 
-                         "' to the task's logging infrastructure with the exception: " + 
-                         StringUtils.stringifyException(ioe));
+  @Override
+  public void append(LoggingEvent event) {
+    synchronized (this) {
+      if (tail == null) {
+        super.append(event);
+      } else {
+        if (tail.size() >= maxEvents) {
+          tail.remove();
+        }
+        tail.add(event);
+      }
     }
   }
 
-  public boolean requiresLayout() {
-    return true;
-  }
-
+  @Override
   public synchronized void close() {
-    if (taskLogWriter != null) {
-      try {
-        taskLogWriter.close();
-      } catch (IOException ioe) {
-        errorHandler.error("Failed to close the task's log with the exception: " 
-                           + StringUtils.stringifyException(ioe));
+    if (tail != null) {
+      for(LoggingEvent event: tail) {
+        super.append(event);
       }
-    } else {
-      errorHandler.error("Calling 'close' on uninitialize/closed logger");
     }
+    super.close();
   }
 
   /**
@@ -104,36 +83,14 @@
     this.taskId = taskId;
   }
 
-  public int getNoKeepSplits() {
-    return noKeepSplits;
-  }
-
-  public void setNoKeepSplits(int noKeepSplits) {
-    this.noKeepSplits = noKeepSplits;
-  }
-
-  public int getLogsRetainHours() {
-    return logsRetainHours;
-  }
-
-  public void setLogsRetainHours(int logsRetainHours) {
-    this.logsRetainHours = logsRetainHours;
-  }
-
-  public boolean isPurgeLogSplits() {
-    return purgeLogSplits;
-  }
-
-  public void setPurgeLogSplits(boolean purgeLogSplits) {
-    this.purgeLogSplits = purgeLogSplits;
-  }
-
+  private static final int EVENT_SIZE = 100;
+  
   public long getTotalLogFileSize() {
-    return totalLogFileSize;
+    return maxEvents * EVENT_SIZE;
   }
 
-  public void setTotalLogFileSize(long splitFileSize) {
-    this.totalLogFileSize = splitFileSize;
+  public void setTotalLogFileSize(long logSize) {
+    maxEvents = (int) logSize / EVENT_SIZE;
   }
 
 }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?view=auto&rev=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Mon Aug  6 15:05:40 2007
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.*;
+
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A servlet that is run by the TaskTrackers to provide the task logs via http.
+ */
+public class TaskLogServlet extends HttpServlet {
+  private void printTaskLog(HttpServletResponse response,
+                            OutputStream out, String taskId, 
+                            long start, long end, boolean plainText, 
+                            TaskLog.LogName filter) throws IOException {
+    if (!plainText) {
+      out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" +
+                 "<table border=2 cellpadding=\"2\">\n" +
+                 "<tr><td><pre>\n").getBytes());
+    }
+
+    try {
+      InputStream taskLogReader = 
+        new TaskLog.Reader(taskId, filter, start, end);
+      byte[] b = new byte[65536];
+      int result;
+      while (true) {
+        result = taskLogReader.read(b);
+        if (result > 0) {
+          out.write(b, 0, result);
+        } else {
+          break;
+        }
+      }
+      taskLogReader.close();
+      if( !plainText ) {
+        out.write("</pre></td></tr></table><hr><br>\n".getBytes());
+      }
+    } catch (IOException ioe) {
+      response.sendError(HttpServletResponse.SC_GONE,
+                         "Failed to retrieve " + filter + " log for task: " + 
+                         taskId);
+      out.write(("TaskLogServlet exception:\n" + 
+                 StringUtils.stringifyException(ioe) + "\n").getBytes());
+    }
+  }
+
+  /**
+   * Get the logs via http.
+   */
+  public void doGet(HttpServletRequest request, 
+                    HttpServletResponse response
+                    ) throws ServletException, IOException {
+    long start = 0;
+    long end = -1;
+    boolean plainText = false;
+    TaskLog.LogName filter = null;
+
+    String taskId = request.getParameter("taskid");
+    if (taskId == null) {
+      response.sendError(HttpServletResponse.SC_BAD_REQUEST, 
+                         "Argument taskid is required");
+      return;
+    }
+    String logFilter = request.getParameter("filter");
+    if (logFilter != null) {
+      try {
+        filter = TaskLog.LogName.valueOf(TaskLog.LogName.class, 
+                                         logFilter.toUpperCase());
+      } catch (IllegalArgumentException iae) {
+        response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+                           "Illegal value for filter: " + logFilter);
+        return;
+      }
+    }
+    
+    String sLogOff = request.getParameter("start");
+    if (sLogOff != null) {
+      start = Long.valueOf(sLogOff).longValue();
+    }
+    
+    String sLogEnd = request.getParameter("end");
+    if (sLogEnd != null) {
+      end = Long.valueOf(sLogEnd).longValue();
+    }
+    
+    String sPlainText = request.getParameter("plaintext");
+    if (sPlainText != null) {
+      plainText = Boolean.valueOf(sPlainText);
+    }
+
+    OutputStream out = response.getOutputStream();
+    if( !plainText ) {
+      out.write(("<html>\n" +
+                 "<title>Task Logs: '" + taskId + "'</title>\n" +
+                 "<body>\n" +
+                 "<h1>Task Logs: '" +  taskId +  "'</h1><br>\n").getBytes()); 
+
+      if (filter == null) {
+        printTaskLog(response, out, taskId, start, end, plainText, 
+                     TaskLog.LogName.STDOUT);
+        printTaskLog(response, out, taskId, start, end, plainText, 
+                     TaskLog.LogName.STDERR);
+        printTaskLog(response, out, taskId, start, end, plainText, 
+                     TaskLog.LogName.SYSLOG);
+      } else {
+        printTaskLog(response, out, taskId, start, end, plainText, filter);
+      }
+      
+      out.write("</body></html>\n".getBytes());
+      out.close();
+    } else {
+      printTaskLog(response, out, taskId, start, end, plainText, filter);
+    } 
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Aug  6 15:05:40 2007
@@ -23,6 +23,7 @@
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 import java.io.*;
+import java.util.List;
 import java.util.Vector;
 import java.net.URI;
 
@@ -41,9 +42,6 @@
 
   protected JobConf conf;
 
-  private TaskLog.Writer taskStdOutLogWriter;
-  private TaskLog.Writer taskStdErrLogWriter;
-
   /** 
    * for cleaning up old map outputs
    */
@@ -53,18 +51,6 @@
     this.t = t;
     this.tracker = tracker;
     this.conf = conf;
-    this.taskStdOutLogWriter = 
-      new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDOUT, 
-                         this.conf.getInt("mapred.userlog.num.splits", 4), 
-                         this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
-                         this.conf.getBoolean("mapred.userlog.purgesplits", true),
-                         this.conf.getInt("mapred.userlog.retain.hours", 12));
-    this.taskStdErrLogWriter = 
-      new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDERR, 
-                         this.conf.getInt("mapred.userlog.num.splits", 4), 
-                         this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
-                         this.conf.getBoolean("mapred.userlog.purgesplits", true),
-                         this.conf.getInt("mapred.userlog.retain.hours", 12));
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
   }
@@ -76,8 +62,6 @@
    * process before the child is spawned.  It should not execute user code,
    * only system code. */
   public boolean prepare() throws IOException {
-    taskStdOutLogWriter.init();       // initialize the child task's stdout log
-    taskStdErrLogWriter.init();       // initialize the child task's stderr log
     return true;
   }
 
@@ -105,6 +89,7 @@
       //before preparing the job localize 
       //all the archives
       File workDir = new File(t.getJobFile()).getParentFile();
+      String taskid = t.getTaskId();
       File jobCacheDir = new File(workDir.getParent(), "work");
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
@@ -257,7 +242,7 @@
         String javaOpts = handleDeprecatedHeapSize(
                                                    conf.get("mapred.child.java.opts", "-Xmx200m"),
                                                    conf.get("mapred.child.heap.size"));
-        javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
+        javaOpts = replaceAll(javaOpts, "@taskid@", taskid);
         int port = conf.getInt("mapred.task.tracker.report.port", 50050) + 1;
         javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
         String [] javaOptsSplit = javaOpts.split(" ");
@@ -284,24 +269,27 @@
         vargs.add(classPath.toString());
 
         // Setup the log4j prop
+        long logSize = TaskLog.getTaskLogLength(conf);
         vargs.add("-Dhadoop.log.dir=" + System.getProperty("hadoop.log.dir"));
         vargs.add("-Dhadoop.root.logger=INFO,TLA");
-        vargs.add("-Dhadoop.tasklog.taskid=" + t.getTaskId());
-        vargs.add("-Dhadoop.tasklog.noKeepSplits=" + conf.getInt("mapred.userlog.num.splits", 4)); 
-        vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + (conf.getInt("mapred.userlog.limit.kb", 100) * 1024));
-        vargs.add("-Dhadoop.tasklog.purgeLogSplits=" + conf.getBoolean("mapred.userlog.purgesplits", true));
-        vargs.add("-Dhadoop.tasklog.logsRetainHours=" + conf.getInt("mapred.userlog.retain.hours", 12)); 
-
-        
+        vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+        vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
 
         // Add main class and its arguments 
         vargs.add(TaskTracker.Child.class.getName());  // main of Child
         // pass umbilical port
-        vargs.add(tracker.getTaskTrackerReportPort() + ""); 
-        vargs.add(t.getTaskId());                      // pass task identifier
+        vargs.add(Integer.toString(tracker.getTaskTrackerReportPort())); 
+        vargs.add(taskid);                      // pass task identifier
 
         // Run java
-        runChild((String[])vargs.toArray(new String[0]), workDir);
+        File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+        File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+        stdout.getParentFile().mkdirs();
+
+        List<String> wrappedCommand = 
+          TaskLog.captureOutAndError(vargs, stdout, stderr, logSize);
+        runChild(wrappedCommand, workDir);
+                 
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       try {
@@ -402,22 +390,12 @@
   /**
    * Run the child process
    */
-  private void runChild(String[] args, File dir) throws IOException {
-    this.process = Runtime.getRuntime().exec(args, null, dir);
+  private void runChild(List<String> args, File dir) throws IOException {
+    ProcessBuilder builder = new ProcessBuilder(args);
+    builder.directory(dir);
+    process = builder.start();
     
-    Thread logStdErrThread = null;
-    Thread logStdOutThread = null;
     try {
-      // Copy stderr of the child-process via a thread
-      logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"), 
-                                   process.getErrorStream(), 
-                                   taskStdErrLogWriter);
-      
-      // Copy stdout of the child-process via a thread
-      logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"), 
-                                  process.getInputStream(), 
-                                  taskStdOutLogWriter); 
-      
       int exit_code = process.waitFor();
      
       if (!killed && exit_code != 0) {
@@ -427,26 +405,10 @@
         throw new IOException("Task process exit with nonzero status of " +
                               exit_code + ".");
       }
-      
     } catch (InterruptedException e) {
       throw new IOException(e.toString());
     } finally {
-      kill();
-      
-      // Kill both stdout/stderr copying threads 
-      if (logStdErrThread != null) {
-        logStdErrThread.interrupt();
-        try {
-          logStdErrThread.join();
-        } catch (InterruptedException ie) {}
-      }
-      
-      if (logStdOutThread != null) {
-        logStdOutThread.interrupt();
-        try {
-          logStdOutThread.join();
-        } catch (InterruptedException ie) {}
-      }
+      kill();      
     }
   }
 
@@ -460,48 +422,4 @@
     killed = true;
   }
 
-  /**
-   * Spawn a new thread to copy the child-jvm's stdout/stderr streams
-   * via a {@link TaskLog.Writer}
-   * 
-   * @param threadName thread name
-   * @param stream child-jvm's stdout/stderr stream
-   * @param writer {@link TaskLog.Writer} used to copy the child-jvm's data
-   * @return Return the newly created thread
-   */
-  private Thread logStream(String threadName, 
-                           final InputStream stream, 
-                           final TaskLog.Writer taskLog) {
-    Thread loggerThread = new Thread() {
-      public void run() {
-        try {
-          byte[] buf = new byte[512];
-          while (!Thread.interrupted()) {
-            while (stream.available() > 0) {
-              int n = stream.read(buf, 0, buf.length);
-              taskLog.write(buf, 0, n);
-            }
-            Thread.sleep(1000);
-          }
-        } catch (IOException e) {
-          LOG.warn(t.getTaskId()+" Error reading child output", e);
-        } catch (InterruptedException e) {
-          // expected
-        } finally {
-          try {
-            stream.close();
-            taskLog.close();
-          } catch (IOException e) {
-            LOG.warn(t.getTaskId()+" Error closing child output", e);
-          }
-        }
-      }
-    };
-    loggerThread.setName(threadName);
-    loggerThread.setDaemon(true);
-    loggerThread.start();
-    
-    return loggerThread;
-  }
-  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Aug  6 15:05:40 2007
@@ -17,9 +17,13 @@
  */
  package org.apache.hadoop.mapred;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FilterOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.BindException;
@@ -756,6 +760,7 @@
     server.setAttribute("localDirAllocator", localDirAllocator);
     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
     server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+    server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
     server.start();
     this.httpPort = server.getPort();
     initialize();
@@ -1763,6 +1768,7 @@
    * The main() for child processes. 
    */
   public static class Child {
+    
     public static void main(String[] args) throws Throwable {
       //LogFactory.showTime(false);
       LOG.debug("Child starting");
@@ -1778,6 +1784,7 @@
             
       Task task = umbilical.getTask(taskid);
       JobConf job = new JobConf(task.getJobFile());
+      TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
       task.setConf(job);
           
       defaultConf.addFinalResource(new Path(task.getJobFile()));

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Mon Aug  6 15:05:40 2007
@@ -18,9 +18,8 @@
 
 package org.apache.hadoop.mapred.pipes;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
@@ -37,6 +36,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -73,6 +73,12 @@
     String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
     FileUtil.chmod(executable, "a+x");
     cmd.add(executable);
+    // wrap the command in a stdout/stderr capture
+    String taskid = conf.get("mapred.task.id");
+    File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+    File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    long logLength = TaskLog.getTaskLogLength(conf);
+    cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();
     handler = new OutputHandler(output, reporter);
@@ -140,39 +146,6 @@
   }
 
   /**
-   * A thread to copy an input stream to an output stream.
-   * Errors cause the copy to stop and are not reported back.
-   * The input stream is closed when the thread exits. The output stream
-   * is not closed.
-   */
-  private static class OutputCopier extends Thread {
-    InputStream in;
-    OutputStream out;
-    OutputCopier(String name, InputStream in, OutputStream out) {
-      super(name);
-      this.in = in;
-      this.out = out;
-    }
-    public void run() {
-      byte[] buffer = new byte[65536];
-      try {
-        while (true) {
-          int size = in.read(buffer);
-          if (size == -1) {
-            break;
-          }
-          out.write(buffer, 0, size);
-        }
-      } catch (IOException ie) {
-      } finally {
-        try {
-          in.close();
-        } catch (IOException ie) { }
-      }
-    }
-  }
-
-  /**
    * Run a given command in a subprocess, including threads to copy its stdout
    * and stderr to our stdout and stderr.
    * @param command the command and its arguments
@@ -187,11 +160,6 @@
       builder.environment().putAll(env);
     }
     Process result = builder.start();
-    result.getOutputStream().close();
-    new OutputCopier("pipes-stdout", result.getInputStream(), 
-                     System.out).start();
-    new OutputCopier("pipes-stderr", result.getErrorStream(), 
-                     System.err).start();
     return result;
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Mon Aug  6 15:05:40 2007
@@ -21,7 +21,6 @@
 import java.io.*;
 import java.net.Socket;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 

Modified: lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp Mon Aug  6 15:05:40 2007
@@ -56,11 +56,11 @@
         out.print("<td>");
         if (taskTracker != null) {
           String taskLogUrl = "http://" + taskTracker.getHost() + ":" +
-          	taskTracker.getHttpPort() + "/tasklog.jsp?taskid=" + 
+          	taskTracker.getHttpPort() + "/tasklog?taskid=" + 
           	statuses[i].getTaskId();
-          String tailFourKBUrl = taskLogUrl + "&tail=true&tailsize=4096";
-          String tailEightKBUrl = taskLogUrl + "&tail=true&tailsize=8192";
-          String entireLogUrl = taskLogUrl + "&all=true";
+          String tailFourKBUrl = taskLogUrl + "&start=-4097";
+          String tailEightKBUrl = taskLogUrl + "&start=-8193";
+          String entireLogUrl = taskLogUrl;
           out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
           out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
           out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");

Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?view=diff&rev=563300&r1=563299&r2=563300
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Mon Aug  6 15:05:40 2007
@@ -92,10 +92,10 @@
       if (taskAttemptTracker == null) {
         out.print("n/a");
       } else {
-        String taskLogUrl = taskAttemptTracker + "/tasklog.jsp?taskid=" + 
+        String taskLogUrl = taskAttemptTracker + "/tasklog?taskid=" + 
                               status.getTaskId();
-        String tailFourKBUrl = taskLogUrl + "&tail=true&tailsize=4096";
-        String tailEightKBUrl = taskLogUrl + "&tail=true&tailsize=8192";
+        String tailFourKBUrl = taskLogUrl + "&start=-4097";
+        String tailEightKBUrl = taskLogUrl + "&start=-8193";
         String entireLogUrl = taskLogUrl + "&all=true";
         out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
         out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");