You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/14 21:04:49 UTC

svn commit: r474944 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/mapred/ src/webapps/job/ src/webapps/task/

Author: cutting
Date: Tue Nov 14 12:04:48 2006
New Revision: 474944

URL: http://svn.apache.org/viewvc?view=rev&rev=474944
Log:
HADOOP-489.  Separate user logs from system logs.  Contributed by Arun.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    lucene/hadoop/trunk/src/webapps/task/tasklog.jsp
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
    lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=474944&r1=474943&r2=474944
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 14 12:04:48 2006
@@ -63,6 +63,10 @@
 19. HADOOP-661.  Make each job's configuration visible through the web
     ui.  (Arun C Murthy via cutting)
 
+20. HADOOP-489.  In MapReduce, separate user logs from system logs.
+    Each task's log output is now available through the web ui.  (Arun
+    C Murthy via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=474944&r1=474943&r2=474944
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Nov 14 12:04:48 2006
@@ -575,6 +575,35 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.userlog.num.splits</name>
+  <value>4</value>
+  <description>The number of fragments into which the user-log is to be split.
+  </description>
+</property>
+
+<property>
+  <name>mapred.userlog.limit.kb</name>
+  <value>100</value>
+  <description>The maximum size of user-logs of each task.
+  </description>
+</property>
+
+<property>
+  <name>mapred.userlog.purgesplits</name>
+  <value>true</value>
+  <description>Should the splits be purged disregarding the user-log size limit.
+  </description>
+</property>
+
+<property>
+  <name>mapred.userlog.retain.hours</name>
+  <value>12</value>
+  <description>The maximum time, in hours, for which the user-logs are to be 
+  				retained.
+  </description>
+</property>
+
 <!-- ipc properties -->
 
 <property>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?view=diff&rev=474944&r1=474943&r2=474944
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Tue Nov 14 12:04:48 2006
@@ -33,6 +33,10 @@
   
   /** Delete any temporary files from previous failed attempts. */
   public boolean prepare() throws IOException {
+    if (!super.prepare()) {
+      return false;
+    }
+    
     this.mapOutputFile.removeAll(getTask().getTaskId());
     return true;
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=474944&r1=474943&r2=474944
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Nov 14 12:04:48 2006
@@ -318,6 +318,9 @@
 
   /** Assemble all of the map output files */
   public boolean prepare() throws IOException {
+    if (!super.prepare()) {
+      return false;
+    }
     
     // cleanup from failures
     this.mapOutputFile.removeAll(reduceTask.getTaskId());

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=auto&rev=474944
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Nov 14 12:04:48 2006
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Vector;
+
+import org.apache.commons.logging.*;
+
+/**
+ * A simple logger to handle the task-specific user logs.
+ * This class uses the system property <code>hadoop.log.dir</code>.
+ * 
+ * @author Arun C Murthy
+ */
+class TaskLog {
+  private static final Log LOG =
+    LogFactory.getLog(TaskLog.class.getName());
+
+  private static final File LOG_DIR = 
+    new File(System.getProperty("hadoop.log.dir"), "userlogs");
+  
+  private static final String SPLIT_INDEX_NAME = "split.idx";
+  
+  static {
+    if (!LOG_DIR.exists()) {
+      LOG_DIR.mkdirs();
+    }
+  }
+
+  /**
+   * The log-writer responsible for handling writing user-logs
+   * and maintaining splits and ensuring job-specifc limits 
+   * w.r.t logs-size etc. are honoured.
+   *  
+   * @author Arun C Murthy
+   */
+  static class Writer {
+    private String taskId;
+    private JobConf conf;
+
+    private final File taskLogDir;
+    private final int noKeepSplits;
+    private final long splitFileSize;
+    private final boolean purgeLogSplits;
+    private final int logsRetainHours;
+
+    private boolean initialized = false;
+    private long splitOffset = 0;
+    private long splitLength = 0;
+    private int noSplits = 0;
+    
+    private File currentSplit;            // current split filename
+    private OutputStream out;               // current split
+    private OutputStream splitIndex;        // split index file
+    
+    private int flushCtr = 0;
+    private final static int FLUSH_BYTES = 256;
+
+    /**
+     * Creates a new TaskLog writer.
+     * @param conf configuration of the task
+     * @param taskId taskid of the task
+     */
+    Writer(JobConf conf, String taskId) {
+      this.conf = conf;
+      this.taskId = taskId;
+      this.taskLogDir = new File(LOG_DIR, this.taskId);
+      
+      noKeepSplits = this.conf.getInt("mapred.userlog.num.splits", 4);
+      splitFileSize = 
+        (this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024) / noKeepSplits; 
+      purgeLogSplits = this.conf.getBoolean("mapred.userlog.purgesplits", 
+                                      true);
+      logsRetainHours = this.conf.getInt("mapred.userlog.retain.hours", 12);
+    }
+    
+    private static class TaskLogsPurgeFilter implements FileFilter {
+      long purgeTimeStamp;
+    
+      TaskLogsPurgeFilter(long purgeTimeStamp) {
+        this.purgeTimeStamp = purgeTimeStamp;
+      }
+
+      public boolean accept(File file) {
+        LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
+        return file.lastModified() < purgeTimeStamp;
+      }
+    }
+
+    private File getLogSplit(int split) {
+      String splitName = "part-" + String.format("%1$06d", split);
+      return new File(taskLogDir, splitName); 
+    }
+    
+    private void deleteDir(File dir) throws IOException {
+      File[] files = dir.listFiles();
+      if (files != null) {
+        for (int i=0; i < files.length; ++i) {
+          files[i].delete();
+        }
+      }
+      boolean del = dir.delete();
+      LOG.debug("Deleted " + del + ": " + del);
+    }
+    
+    /**
+     * Initialize the task log-writer.
+     * 
+     * @throws IOException
+     */
+    public synchronized void init() throws IOException {
+      if (!initialized) {
+        // Purge logs of tasks on this tasktracker if their  
+        // mtime has exceeded "mapred.task.log.retain" hours
+        long purgeTimeStamp = System.currentTimeMillis() - 
+                              (logsRetainHours*60*60*1000);
+        File[] oldTaskLogs = LOG_DIR.listFiles(
+                                new TaskLogsPurgeFilter(purgeTimeStamp)
+                              );
+        if (oldTaskLogs != null) {
+          for (int i=0; i < oldTaskLogs.length; ++i) {
+            deleteDir(oldTaskLogs[i]);
+          }
+        }
+
+        // Initialize the task's log directory
+        if (taskLogDir.exists()) {
+          deleteDir(taskLogDir);
+        }
+        taskLogDir.mkdirs();
+        
+        // Create the split index
+        splitIndex = new BufferedOutputStream(
+            new FileOutputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
+            );
+
+        out = createLogSplit(noSplits);
+        initialized = true;
+      }
+    }
+    
+    /**
+     * Write a log message to the task log.
+     * 
+     * @param b bytes to be writter
+     * @param off start offset
+     * @param len length of data
+     * @throws IOException
+     */
+    public synchronized void write(byte[] b, int off, int len) 
+    throws IOException {
+      // Check if we need to rotate the log
+      if (splitLength > splitFileSize) {
+        LOG.debug("Total no. of bytes written to split#" + noSplits + 
+            " -> " + splitLength);
+        logRotate();
+      }
+      
+      // Periodically flush data to disk
+      if (flushCtr > FLUSH_BYTES) {
+        out.flush();
+        flushCtr = 0;
+      }
+      
+      // Write out to the log-split
+      out.write(b, off, len);
+      splitLength += len;
+      flushCtr += len;
+    }
+    
+    /**
+     * Close the task log.
+     * 
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      // Close the final split
+      if (out != null) {
+        out.close();
+      }
+
+      // Close the split-index
+      if (splitIndex != null) {
+        writeIndexRecord();
+        splitIndex.close();
+      }
+    }
+
+    private synchronized OutputStream createLogSplit(int split) 
+    throws IOException {
+      currentSplit =  getLogSplit(split);
+      LOG.debug("About to create the split: " + currentSplit);
+      return new BufferedOutputStream(new FileOutputStream(currentSplit));
+    }
+    
+    private synchronized void writeIndexRecord() throws IOException {
+      String indexRecord = new String(currentSplit + "|" + 
+          splitOffset + "|" + splitLength + "\n");
+      splitIndex.write(indexRecord.getBytes());
+      splitIndex.flush();
+    }
+    
+    private synchronized void logRotate() throws IOException {
+      // Close the current split
+      LOG.debug("About to rotate-out the split: " + noSplits);
+      out.close();
+      
+      // Record the 'split' in the index
+      writeIndexRecord();
+      
+      // Re-initialize the state
+      splitOffset += splitLength;
+      splitLength = 0;
+      flushCtr = 0;
+
+      // New 'split'
+      ++noSplits;
+
+      // Check if we need to purge an old split
+      if (purgeLogSplits) {
+        if (noSplits >= noKeepSplits) {   // noSplits is zero-based
+          File purgeLogSplit = getLogSplit((noSplits-noKeepSplits));
+          purgeLogSplit.delete();
+          LOG.debug("Purged log-split #" + (noSplits-noKeepSplits) + " - " + 
+              purgeLogSplit);
+        }
+      }
+      
+      // Rotate the log
+      out = createLogSplit(noSplits); 
+    }
+    
+  } // TaskLog.Writer
+
+  /**
+   * The log-reader for reading the 'split' user-logs.
+   *
+   * @author Arun C Murthy
+   */
+  static class Reader {
+    private String taskId;
+    private File taskLogDir;
+    private boolean initialized = false;
+    
+    private IndexRecord[] indexRecords = null;
+    private BufferedReader splitIndex;
+    
+    private long logFileSize = 0;
+    
+    /**
+     * Create a new task log reader.
+     * 
+     * @param taskId task id of the task.
+     */
+    public Reader(String taskId) {
+      this.taskId = taskId;
+      this.taskLogDir = new File(LOG_DIR, this.taskId);
+    }
+
+    private static class IndexRecord {
+      String splitName;
+      long splitOffset;
+      long splitLength;
+      
+      IndexRecord(String splitName, long splitOffset, long splitLength) {
+        this.splitName = splitName;
+        this.splitOffset = splitOffset;
+        this.splitLength = splitLength;
+      }
+    }
+    
+    private synchronized void init() throws IOException {
+      this.splitIndex = new BufferedReader(new InputStreamReader(
+                          new FileInputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
+                          ));
+
+      // Parse the split-index and store the offsets/lengths
+      ArrayList<IndexRecord> records = new ArrayList<IndexRecord>();
+      String line;
+      while ((line = splitIndex.readLine()) != null) {
+        String[] fields = line.split("\\|");
+        if (fields.length != 3) {
+          throw new IOException("Malformed split-index with " + 
+              fields.length + " fields");
+        }
+        
+        IndexRecord record = new IndexRecord(
+                                fields[0], 
+                                Long.valueOf(fields[1]).longValue(), 
+                                Long.valueOf(fields[2]).longValue()
+                              );
+        LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + 
+            ", " + record.splitLength + ">");
+        
+        // Save 
+        records.add(record);
+        logFileSize += record.splitLength;
+      }
+
+      indexRecords = new IndexRecord[records.size()];
+      indexRecords = records.toArray(indexRecords);
+      initialized = true;
+      LOG.debug("Log size: " + logFileSize);
+    }
+
+    /**
+     * Return the total 'logical' log-size written by the task, including
+     * purged data.
+     * 
+     * @return the total 'logical' log-size written by the task, including
+     *         purged data.
+     * @throws IOException
+     */
+    public long getTotalLogSize() throws IOException {
+      if (!initialized) {
+        init();
+      }
+      
+      return logFileSize;
+    }
+    /**
+     * Return the entire user-log (remaining splits).
+     * 
+     * @return Returns a <code>byte[]</code> containing the data in user-log.
+     * @throws IOException
+     */
+    public byte[] fetchAll() throws IOException {
+      if (!initialized) {
+        init();
+      }
+      
+      // Get all splits 
+      Vector<InputStream> streams = new Vector<InputStream>();
+      int totalLogSize = 0;
+      for (int i=0; i < indexRecords.length; ++i) {
+        InputStream stream = getLogSplit(i);
+        if (stream != null) {
+          streams.add(stream);
+          totalLogSize += indexRecords[i].splitLength;
+          LOG.debug("Added split: " + i);
+        }
+      }
+      LOG.debug("Total log-size on disk: " + totalLogSize + 
+          "; actual log-size: " + logFileSize);
+
+      // Copy log data into buffer
+      byte[] b = new byte[totalLogSize];
+      SequenceInputStream in = new SequenceInputStream(streams.elements());
+      int bytesRead = 0, totalBytesRead = 0;
+      int off = 0, len = totalLogSize;
+      LOG.debug("Attempting to read " + len + " bytes from logs");
+      while ((bytesRead = in.read(b, off, len)) > 0) {
+        LOG.debug("Got " + bytesRead + " bytes");
+        off += bytesRead;
+        len -= bytesRead;
+        
+        totalBytesRead += bytesRead;
+      }
+
+      if (totalBytesRead != totalLogSize) {
+        LOG.debug("Didn't not read all requisite data in logs!");
+      }
+      
+      return b;
+    }
+    
+    /**
+     * Tail the user-log.
+     * 
+     * @param b the buffer into which the data is read.
+     * @param off the start offset in array <code>b</code>
+     *            at which the data is written.
+     * @param len the maximum number of bytes to read.
+     * @param tailSize the no. of bytes to be read from end of file.
+     * @param tailWindow the sliding window for tailing the logs.
+     * @return the total number of bytes of user-logs dataread into the buffer.
+     * @throws IOException
+     */
+    public synchronized int tail(byte[] b, int off, int len, 
+        long tailSize, int tailWindow) 
+    throws IOException {
+      if (!initialized) {
+        init();
+      }
+      
+      LOG.debug("tailSize: " + tailSize + " - tailWindow: " + tailWindow);
+      
+      if (tailSize*tailWindow > logFileSize) {
+        tailSize = logFileSize;
+        tailWindow = 1;
+      }
+      
+      return read(b, off, len, 
+          (long)(logFileSize-(tailSize*tailWindow)), tailSize);
+    }
+
+    /**
+     * Read user-log data given an offset/length.
+     * 
+     * @param b the buffer into which the data is read.
+     * @param off the start offset in array <code>b</code>
+     *            at which the data is written.
+     * @param len the maximum number of bytes to read.
+     * @param logOffset the offset of the user-log from which to get data.
+     * @param logLength the maximum number of bytes of user-log data to fetch. 
+     * @return the total number of bytes of user-logs dataread into the buffer.
+     * @throws IOException
+     */
+    public synchronized int read(byte[] b, int off, int len, 
+        long logOffset, long logLength) 
+    throws IOException {
+      LOG.debug("TaskeLog.Reader.read: logOffset: " + logOffset + " - logLength: " + logLength);
+
+      // Sanity check
+      if (logLength == 0) {
+        return 0;
+      }
+      
+      if (!initialized) {
+        init();
+      }
+      
+      // Locate the requisite splits 
+      Vector<InputStream> streams = new Vector<InputStream>();
+      long offset = logOffset;
+      int startIndex = -1, stopIndex = -1;
+      boolean inRange = false;
+      for (int i=0; i < indexRecords.length; ++i) {
+        LOG.debug("offset: " + offset + " - (split, splitOffset) : (" + 
+            i + ", " + indexRecords[i].splitOffset + ")");
+        
+        if (offset <= indexRecords[i].splitOffset) {
+          if (!inRange) {
+            startIndex = i - ((i > 0) ? 1 : 0);
+            LOG.debug("Starting at split: " + startIndex);
+            offset += logLength;
+            InputStream stream = getLogSplit(startIndex);
+            if (stream != null) {
+              streams.add(stream);
+            }
+            LOG.debug("Added split: " + startIndex);
+            inRange = true;
+          } else {
+            stopIndex = i-1;
+            LOG.debug("Stop at split: " + stopIndex);
+            break;
+          }
+        }
+        
+        if (inRange) {
+          InputStream stream = getLogSplit(i);
+          if (stream != null) {
+            streams.add(stream);
+          }
+          LOG.debug("Added split: " + i);
+        }
+      }
+      if (startIndex == -1) {
+        throw new IOException("Illegal logOffset/logLength");
+      }
+      if (stopIndex == -1) {
+        stopIndex = indexRecords.length - 1;
+        LOG.debug("Stop at split: " + stopIndex);
+        
+        // Check if request exceeds the log-file size
+        if ((logOffset+logLength) > logFileSize) {
+          LOG.debug("logOffset+logLength exceeds log-file size");
+          logLength = logFileSize - logOffset;
+        }
+      }
+      
+      // Copy requisite data into user buffer
+      SequenceInputStream in = new SequenceInputStream(streams.elements());
+      if (streams.size() == (stopIndex - startIndex +1)) {
+        // Skip to get to 'logOffset' if logs haven't been purged
+        long skipBytes = 
+          in.skip(logOffset - indexRecords[startIndex].splitOffset);
+        LOG.debug("Skipped " + skipBytes + " bytes from " + 
+            startIndex + " stream");
+      }
+      int bytesRead = 0, totalBytesRead = 0;
+      len = Math.min((int)logLength, len);
+      LOG.debug("Attempting to read " + len + " bytes from logs");
+      while ((bytesRead = in.read(b, off, len)) > 0) {
+        off += bytesRead;
+        len -= bytesRead;
+        
+        totalBytesRead += bytesRead;
+      }
+      
+      return totalBytesRead;
+    }
+
+    private synchronized InputStream getLogSplit(int split) 
+    throws IOException {
+      String splitName = indexRecords[split].splitName;
+      LOG.debug("About to open the split: " + splitName);
+      InputStream in = null;
+      try {
+        in = new BufferedInputStream(new FileInputStream(new File(splitName)));
+      } catch (FileNotFoundException fnfe) {
+        in = null;
+        LOG.debug("Split " + splitName + " not found... probably purged!");
+      }
+      
+      return in;
+    }
+
+  } // TaskLog.Reader
+
+} // TaskLog

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=474944&r1=474943&r2=474944
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Nov 14 12:04:48 2006
@@ -19,14 +19,10 @@
 
 import org.apache.commons.logging.*;
 
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.*;
 import org.apache.hadoop.filecache.*;
 import java.io.*;
-import java.util.jar.*;
 import java.util.Vector;
-import java.util.Enumeration;
 import java.net.URI;
 
 /** Base class that runs a task in a separate process.  Tasks are run in a
@@ -44,10 +40,13 @@
 
   protected JobConf conf;
 
+  private TaskLog.Writer taskLogWriter;
+  
   public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {
     this.t = t;
     this.tracker = tracker;
     this.conf = conf;
+    this.taskLogWriter = new TaskLog.Writer(conf, t.getTaskId());
   }
 
   public Task getTask() { return t; }
@@ -56,7 +55,10 @@
   /** Called to assemble this task's input.  This method is run in the parent
    * process before the child is spawned.  It should not execute user code,
    * only system code. */
-  public boolean prepare() throws IOException {return true;}
+  public boolean prepare() throws IOException {
+    taskLogWriter.init();                   // initialize the child task's log
+    return true;
+  }
 
   /** Called when this task's output is no longer needed.
   * This method is run in the parent process after the child exits.  It should
@@ -305,7 +307,7 @@
         }
       }.start();
         
-      logStream(process.getInputStream());        // normally empty
+      logStream(process.getInputStream());		  // normally empty
       
       int exit_code = process.waitFor();
      
@@ -318,6 +320,7 @@
       throw new IOException(e.toString());
     } finally {
       kill();
+      taskLogWriter.close();
     }
   }
 
@@ -335,10 +338,11 @@
    */
   private void logStream(InputStream output) {
     try {
-      BufferedReader in = new BufferedReader(new InputStreamReader(output));
-      String line;
-      while ((line = in.readLine()) != null) {
-        LOG.info(t.getTaskId()+" "+line);
+      byte[] buf = new byte[512];
+      int n = 0;
+      while ((n = output.read(buf, 0, buf.length)) != -1) {
+        // Write out to the task's log
+        taskLogWriter.write(buf, 0, n);
       }
     } catch (IOException e) {
       LOG.warn(t.getTaskId()+" Error reading child output", e);
@@ -350,4 +354,5 @@
       }
     }
   }
-}
+  
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp?view=diff&rev=474944&r1=474943&r2=474944
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp Tue Nov 14 12:04:48 2006
@@ -45,9 +45,22 @@
             if (itr.hasNext()) {
               out.print("\n-------\n");
             }
-           }
-         }
-         out.print("</pre></td></tr>\n");
+          }
+        }
+        out.print("</pre></td>");
+        
+        String taskLogUrl = "http://" + taskTracker.getHost() + ":" +
+        	taskTracker.getHttpPort() + "/tasklog.jsp?taskid=" + statuses[i].getTaskId();
+        String tailFourKBUrl = taskLogUrl + "&tail=true&tailsize=4096";
+        String tailEightKBUrl = taskLogUrl + "&tail=true&tailsize=8192";
+        String entireLogUrl = taskLogUrl + "&all=true";
+        out.print("<td>");
+        out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+        out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+        out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+        out.print("</td>");
+        
+        out.print("</tr>\n");
        }
     }
   }
@@ -78,7 +91,7 @@
     }
     out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");
     out.print("<tr><th>Attempt</th><th>Task</th><th>Machine</th>" +
-              "<th>Error</th></tr>\n");
+              "<th>Error</th><th>Logs</th></tr>\n");
     if (includeMap) {
       TaskInProgress[] tips = job.getMapTasks();
       for(int i=0; i < tips.length; ++i) {

Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?view=diff&rev=474944&r1=474943&r2=474944
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Tue Nov 14 12:04:48 2006
@@ -44,19 +44,21 @@
   <%
 	}
   %>
-<td>Finish Time</td><td>Errors</td></tr>
+<td>Finish Time</td><td>Errors</td><td>Task Logs</td></tr>
   <%
     for (int i = 0; i < ts.length; i++) {
       TaskStatus status = ts[i];
       String taskTrackerName = status.getTaskTracker();
       TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
       out.print("<tr><td>" + status.getTaskId() + "</td>");
+      String taskAttemptTracker = null;
       if (taskTracker == null) {
         out.print("<td>" + taskTrackerName + "</td>");
       } else {
-        out.print("<td><a href=\"http://" + taskTracker.getHost() + ":" +
-                  taskTracker.getHttpPort() + "\">" +  taskTracker.getHost() + 
-                  "</a></td>");
+        taskAttemptTracker = "http://" + taskTracker.getHost() + ":" +
+        	taskTracker.getHttpPort();
+        out.print("<td><a href=\"" + taskAttemptTracker + "\">" +  
+        	taskTracker.getHost() + "</a></td>");
       }
       out.print("<td>" + status.getRunState() + "</td>");
       out.print("<td>"+ StringUtils.formatPercent(status.getProgress(),2) + 
@@ -86,7 +88,21 @@
         }
       }
       out.print("</pre></td>");
-      out.print("</tr>\n");
+      if (taskAttemptTracker == null) {
+        out.print("&nbsp;");
+      } else {
+        String taskLogUrl = taskAttemptTracker + "/tasklog.jsp?taskid=" + 
+                              status.getTaskId();
+        String tailFourKBUrl = taskLogUrl + "&tail=true&tailsize=4096";
+        String tailEightKBUrl = taskLogUrl + "&tail=true&tailsize=8192";
+        String entireLogUrl = taskLogUrl + "&all=true";
+        out.print("<td>");
+        out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+        out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+        out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+        out.print("</td>");
+      }
+      out.println("</tr>\n");
     }
   }
   %>

Added: lucene/hadoop/trunk/src/webapps/task/tasklog.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasklog.jsp?view=auto&rev=474944
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/tasklog.jsp (added)
+++ lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Tue Nov 14 12:04:48 2006
@@ -0,0 +1,135 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+%>
+<%
+  long logOffset = -1, logLength = -1;
+  boolean tailLog = false;
+  long tailSize = 1024;
+  int tailWindow = 1;
+  boolean entireLog = false;
+  
+  String taskId = request.getParameter("taskid");
+  if (taskId == null) {
+  	out.println("<h2>Missing 'taskid' for fetching logs!</h2>");
+  	return;
+  }
+  
+  String sLogOff = request.getParameter("off");
+  if (sLogOff != null) {
+  	logOffset = Long.valueOf(sLogOff).longValue();
+  }
+  
+  String sLogLen = request.getParameter("len");
+  if (sLogLen != null) {
+  	logLength = Long.valueOf(sLogLen).longValue();
+  }
+  
+  String sEntireLog = request.getParameter("all");
+  if (sEntireLog != null) {
+  	entireLog = Boolean.valueOf(sEntireLog);
+  }
+  
+  String sTail = request.getParameter("tail");
+  if (sTail != null) {
+  	tailLog = Boolean.valueOf(sTail);
+  }
+  
+  String sTailLen = request.getParameter("tailsize");
+  if (sTailLen != null) {
+  	tailSize = Long.valueOf(sTailLen).longValue();
+  }
+  
+  String sTailWindow = request.getParameter("tailwindow");
+  if (sTailWindow != null) {
+  	tailWindow = Integer.valueOf(sTailWindow).intValue();
+  }
+
+  if (logOffset == -1 || logLength == -1) {
+  	tailLog = true;
+  	tailWindow = 1;
+  }
+
+  if (entireLog) {
+    tailLog = false;
+  }
+%>
+
+<html>
+
+<title><%= taskId %> Task Logs</title>
+
+<body>
+<h1><%= taskId %> Task Logs</h1><br>
+
+<h2>Task Logs</h2>
+<pre>
+<%
+  boolean gotRequiredData = true;
+  try {
+  	TaskLog.Reader taskLogReader = new TaskLog.Reader(taskId);
+    byte[] b = null;
+  	int bytesRead = 0;
+  	int targetLength = 0;
+
+  	if (entireLog) {
+  	  b = taskLogReader.fetchAll();
+  	  targetLength = bytesRead = b.length;
+  	} else {
+  	  if (tailLog) {
+  		b = new byte[(int)tailSize];
+  		targetLength = (int)tailSize;
+  		bytesRead = taskLogReader.tail(b, 0, b.length, tailSize, tailWindow);
+  	  } else {
+  		b = new byte[(int)logLength];
+  		targetLength = (int)logLength;
+  		bytesRead = taskLogReader.read(b, 0, b.length, logOffset, logLength);
+   	  }
+  	}
+  	  
+  	if (bytesRead != targetLength && 
+  	  targetLength <= taskLogReader.getTotalLogSize()) {
+  	  out.println("<b>Warning: Could not fetch " + targetLength + 
+  		  " bytes from the task-logs; probably purged!</b><br/>");
+  	  gotRequiredData = false;
+  	}
+	String logData = new String(b, 0, bytesRead);
+	out.println(logData);
+  } catch (IOException ioe) {
+  	out.println("Failed to retrieve logs for task: " + taskId);
+  }
+%>
+</pre>
+
+<%
+  if (!entireLog) {
+    if (tailLog) {
+      if (gotRequiredData) {
+  	  	out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+  		    "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow+1) + 
+  		    "'>Earlier</a>");
+  	  }
+  	  if (tailWindow > 1) {
+        out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+  	  	    "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow-1) 
+  	  	    + "'>Later</a>");
+  	  }
+    } else {
+      if (gotRequiredData) {
+      	out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+    		"&tail=false&off=" + Math.max(0, (logOffset-logLength)) +
+  		  	"&len=" + logLength + "'>Earlier</a>");
+  	  }
+  	  out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+  		  "&tail=false&off=" + (logOffset+logLength) +
+  		  "&len=" + logLength + "'>Later</a>");
+    }
+  }
+%>
+
+<hr>
+<a href="http://lucene.apache.org/hadoop">Hadoop</a>, 2006.<br>
+</body>
+</html>