You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/08/07 00:52:24 UTC
svn commit: r563324 - in /lucene/hadoop/branches/branch-0.14: ./ conf/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapred/pipes/ src/webapps/job/ src/webapps/task/
Author: omalley
Date: Mon Aug 6 15:52:23 2007
New Revision: 563324
URL: http://svn.apache.org/viewvc?view=rev&rev=563324
Log:
Merge -r 563299:563300 from trunk into 0.14 branch. Fixes HADOOPP-1553.
Added:
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
- copied unchanged from r563300, lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
Removed:
lucene/hadoop/branches/branch-0.14/src/webapps/task/tasklog.jsp
Modified:
lucene/hadoop/branches/branch-0.14/CHANGES.txt
lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml
lucene/hadoop/branches/branch-0.14/conf/log4j.properties
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLog.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskRunner.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/Application.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
lucene/hadoop/branches/branch-0.14/src/webapps/job/jobfailures.jsp
lucene/hadoop/branches/branch-0.14/src/webapps/job/taskdetails.jsp
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Mon Aug 6 15:52:23 2007
@@ -427,6 +427,27 @@
142. HADOOP-1657. Fix NNBench to ensure that the block size is a
multiple of bytes.per.checksum. (Raghu Angadi via dhruba)
+143. HADOOP-1553. Replace user task output and log capture code to use shell
+ redirection instead of copier threads in the TaskTracker. Capping the
+ size of the output is now done via tail in memory and thus should not be large.
+ The output of the tasklog servlet is not forced into UTF8 and is not
+ buffered entirely in memory. (omalley)
+ Configuration changes to hadoop-default.xml:
+ remove mapred.userlog.num.splits
+ remove mapred.userlog.purge.splits
+ change default mapred.userlog.limit.kb to 0 (no limit)
+ change default mapred.userlog.retain.hours to 24
+ Configuration changes to log4j.properties:
+ remove log4j.appender.TLA.noKeepSplits
+ remove log4j.appender.TLA.purgeLogSplits
+ remove log4j.appender.TLA.logsRetainHours
+ URL changes:
+ http://<tasktracker>/tasklog.jsp -> http://<tasktracker>tasklog with
+ parameters limited to start and end, which may be positive (from
+ start) or negative (from end).
+ Environment:
+ require bash (v2 or later) and tail
+
Release 0.13.0 - 2007-06-08
1. HADOOP-1047. Fix TestReplication to succeed more reliably.
Modified: lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml (original)
+++ lucene/hadoop/branches/branch-0.14/conf/hadoop-default.xml Mon Aug 6 15:52:23 2007
@@ -772,29 +772,15 @@
</property>
<property>
- <name>mapred.userlog.num.splits</name>
- <value>4</value>
- <description>The number of fragments into which the user-log is to be split.
- </description>
-</property>
-
-<property>
<name>mapred.userlog.limit.kb</name>
- <value>100</value>
- <description>The maximum size of user-logs of each task.
- </description>
-</property>
-
-<property>
- <name>mapred.userlog.purgesplits</name>
- <value>true</value>
- <description>Should the splits be purged disregarding the user-log size limit.
+ <value>0</value>
+ <description>The maximum size of user-logs of each task in KB. 0 disables the cap.
</description>
</property>
<property>
<name>mapred.userlog.retain.hours</name>
- <value>12</value>
+ <value>24</value>
<description>The maximum time, in hours, for which the user-logs are to be
retained.
</description>
Modified: lucene/hadoop/branches/branch-0.14/conf/log4j.properties
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/conf/log4j.properties?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/conf/log4j.properties (original)
+++ lucene/hadoop/branches/branch-0.14/conf/log4j.properties Mon Aug 6 15:52:23 2007
@@ -52,10 +52,7 @@
log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
-log4j.appender.TLA.noKeepSplits=${hadoop.tasklog.noKeepSplits}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
-log4j.appender.TLA.purgeLogSplits=${hadoop.tasklog.purgeLogSplits}
-log4j.appender.TLA.logsRetainHours=${hadoop.tasklog.logsRetainHours}
log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java Mon Aug 6 15:52:23 2007
@@ -295,6 +295,46 @@
}
/**
+ * This class is only used on windows to invoke the cygpath command.
+ */
+ private static class CygPathCommand extends Command {
+ String[] command;
+ String result;
+ CygPathCommand(String path) throws IOException {
+ command = new String[]{"cygpath", "-u", path};
+ run();
+ }
+ String getResult() throws IOException {
+ return result;
+ }
+ protected String[] getExecString() {
+ return command;
+ }
+ protected void parseExecResult(BufferedReader lines) throws IOException {
+ String line = lines.readLine();
+ if (line == null) {
+ throw new IOException("Can't convert '" + command[2] +
+ " to a cygwin path");
+ }
+ result = line;
+ }
+ }
+
+ /**
+ * Convert a os-native filename to a path that works for the shell.
+ * @param file The filename to convert
+ * @return The unix pathname
+ * @throws IOException on windows, there can be problems with the subprocess
+ */
+ public static String makeShellPath(File file) throws IOException {
+ if (Path.WINDOWS) {
+ return new CygPathCommand(file.toString()).getResult();
+ } else {
+ return file.toString();
+ }
+ }
+
+ /**
* Takes an input dir and returns the du on that local directory. Very basic
* implementation.
*
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Aug 6 15:52:23 2007
@@ -375,8 +375,8 @@
String httpTaskLogLocation = null;
if (null != ttStatus){
httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" +
- ttStatus.getHttpPort() + "/tasklog.jsp?plaintext=true&taskid=" +
- status.getTaskId() + "&all=true";
+ ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
+ status.getTaskId();
}
TaskCompletionEvent taskEvent = null;
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLog.java Mon Aug 6 15:52:23 2007
@@ -20,38 +20,37 @@
import java.io.*;
import java.util.ArrayList;
-import java.util.Vector;
+import java.util.List;
import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.FileUtil;
/**
* A simple logger to handle the task-specific user logs.
* This class uses the system property <code>hadoop.log.dir</code>.
*
*/
-class TaskLog {
+public class TaskLog {
private static final Log LOG =
LogFactory.getLog(TaskLog.class.getName());
private static final File LOG_DIR =
new File(System.getProperty("hadoop.log.dir"), "userlogs");
- private static final String SPLIT_INDEX_NAME = "split.idx";
-
static {
if (!LOG_DIR.exists()) {
LOG_DIR.mkdirs();
}
}
- private static File getTaskLogDir(String taskid, LogFilter filter) {
- return new File(new File(LOG_DIR, taskid), filter.getPrefix());
+ public static File getTaskLogFile(String taskid, LogName filter) {
+ return new File(new File(LOG_DIR, taskid), filter.toString());
}
/**
* The filter for userlogs.
*/
- public static enum LogFilter {
+ public static enum LogName {
/** Log on the stdout of the task. */
STDOUT ("stdout"),
@@ -63,500 +62,189 @@
private String prefix;
- private LogFilter(String prefix) {
+ private LogName(String prefix) {
this.prefix = prefix;
}
- String getPrefix() {
+ public String toString() {
return prefix;
}
}
-
- /**
- * The log-writer responsible for handling writing user-logs
- * and maintaining splits and ensuring job-specifc limits
- * w.r.t logs-size etc. are honoured.
- *
- */
- static class Writer {
- private String taskId;
- private LogFilter filter;
-
- private final File taskLogDir;
- private final int noKeepSplits;
- private final long splitFileSize;
- private final boolean purgeLogSplits;
- private final int logsRetainHours;
-
- private boolean initialized = false;
- private long splitOffset = 0;
- private long splitLength = 0;
- private int noSplits = 0;
-
- private File currentSplit; // current split filename
- private OutputStream out; // current split
- private OutputStream splitIndex; // split index file
-
- private int flushCtr = 0;
- private final static int FLUSH_BYTES = 256;
-
- /**
- * Creates a new TaskLog writer.
- * @param conf configuration of the task
- * @param taskId taskid of the task
- * @param filter the {@link LogFilter} to apply on userlogs.
- */
- Writer(String taskId, LogFilter filter,
- int noKeepSplits, long totalLogSize, boolean purgeLogSplits, int logsRetainHours) {
- this.taskId = taskId;
- this.filter = filter;
-
- this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
-
- this.noKeepSplits = noKeepSplits;
- this.splitFileSize = (totalLogSize / noKeepSplits);
- this.purgeLogSplits = purgeLogSplits;
- this.logsRetainHours = logsRetainHours;
- }
-
- private static class TaskLogsPurgeFilter implements FileFilter {
- long purgeTimeStamp;
-
- TaskLogsPurgeFilter(long purgeTimeStamp) {
- this.purgeTimeStamp = purgeTimeStamp;
- }
- public boolean accept(File file) {
- LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
- return file.lastModified() < purgeTimeStamp;
- }
+ private static class TaskLogsPurgeFilter implements FileFilter {
+ long purgeTimeStamp;
+
+ TaskLogsPurgeFilter(long purgeTimeStamp) {
+ this.purgeTimeStamp = purgeTimeStamp;
}
- private File getLogSplit(int split) {
- String splitName = "part-" + String.format("%1$06d", split);
- return new File(taskLogDir, splitName);
+ public boolean accept(File file) {
+ LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
+ return file.lastModified() < purgeTimeStamp;
}
-
- private void deleteDir(File dir) throws IOException {
- File[] files = dir.listFiles();
- if (files != null) {
- for (int i=0; i < files.length; ++i) {
- if (files[i].isDirectory()) {
- deleteDir(files[i]);
- }
- files[i].delete();
- }
+ }
+ /**
+ * Purge old user logs.
+ *
+ * @throws IOException
+ */
+ public static synchronized void cleanup(int logsRetainHours
+ ) throws IOException {
+ // Purge logs of tasks on this tasktracker if their
+ // mtime has exceeded "mapred.task.log.retain" hours
+ long purgeTimeStamp = System.currentTimeMillis() -
+ (logsRetainHours*60L*60*1000);
+ File[] oldTaskLogs = LOG_DIR.listFiles
+ (new TaskLogsPurgeFilter(purgeTimeStamp));
+ if (oldTaskLogs != null) {
+ for (int i=0; i < oldTaskLogs.length; ++i) {
+ FileUtil.fullyDelete(oldTaskLogs[i]);
}
- boolean del = dir.delete();
- LOG.debug("Deleted " + dir + ": " + del);
}
-
- /**
- * Initialize the task log-writer.
- *
- * @throws IOException
- */
- public synchronized void init() throws IOException {
- if (!initialized) {
- // Purge logs of tasks on this tasktracker if their
- // mtime has exceeded "mapred.task.log.retain" hours
- long purgeTimeStamp = System.currentTimeMillis() -
- (logsRetainHours*60*60*1000);
- File[] oldTaskLogs = LOG_DIR.listFiles(
- new TaskLogsPurgeFilter(purgeTimeStamp)
- );
- if (oldTaskLogs != null) {
- for (int i=0; i < oldTaskLogs.length; ++i) {
- deleteDir(oldTaskLogs[i]);
- }
- }
-
- // Initialize the task's log directory
- if (taskLogDir.exists()) {
- deleteDir(taskLogDir);
- }
- taskLogDir.mkdirs();
-
- // Create the split index
- splitIndex = new BufferedOutputStream(
- new FileOutputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
- );
+ }
- out = createLogSplit(noSplits);
- initialized = true;
- }
- }
-
- /**
- * Write a log message to the task log.
- *
- * @param b bytes to be writter
- * @param off start offset
- * @param len length of data
- * @throws IOException
- */
- public synchronized void write(byte[] b, int off, int len)
- throws IOException {
- // Check if we need to rotate the log
- if (splitLength > splitFileSize) {
- LOG.debug("Total no. of bytes written to split#" + noSplits +
- " -> " + splitLength);
- logRotate();
- }
-
- // Periodically flush data to disk
- if (flushCtr > FLUSH_BYTES) {
- out.flush();
- flushCtr = 0;
- }
-
- // Write out to the log-split
- out.write(b, off, len);
- splitLength += len;
- flushCtr += len;
- }
-
+ public static class Reader extends InputStream {
+ private long bytesRemaining;
+ private FileInputStream file;
/**
- * Close the task log.
- *
+ * Read a log file from start to end positions. The offsets may be negative,
+ * in which case they are relative to the end of the file. For example,
+ * Reader(taskid, kind, 0, -1) is the entire file and
+ * Reader(taskid, kind, -4197, -1) is the last 4196 bytes.
+ * @param taskid the id of the task to read the log file for
+ * @param kind the kind of log to read
+ * @param start the offset to read from (negative is relative to tail)
+ * @param end the offset to read upto (negative is relative to tail)
* @throws IOException
*/
- public synchronized void close() throws IOException {
- // Close the final split
- if (out != null) {
- out.close();
+ public Reader(String taskid, LogName kind,
+ long start, long end) throws IOException {
+ // find the right log file
+ File filename = getTaskLogFile(taskid, kind);
+ // calculate the start and stop
+ long size = filename.length();
+ if (start < 0) {
+ start += size + 1;
}
-
- // Close the split-index
- if (splitIndex != null) {
- writeIndexRecord();
- splitIndex.close();
+ if (end < 0) {
+ end += size + 1;
}
- }
-
- private synchronized OutputStream createLogSplit(int split)
- throws IOException {
- currentSplit = getLogSplit(split);
- LOG.debug("About to create the split: " + currentSplit);
- // Record the 'split' in the index
- writeIndexRecord();
- return new BufferedOutputStream(new FileOutputStream(currentSplit));
- }
-
- private synchronized void writeIndexRecord() throws IOException {
- String indexRecord = currentSplit + "|" + splitOffset + "\n";
- splitIndex.write(indexRecord.getBytes());
- splitIndex.flush();
- }
-
- private synchronized void logRotate() throws IOException {
- // Close the current split
- LOG.debug("About to rotate-out the split: " + noSplits);
- out.close();
-
- // Re-initialize the state
- splitOffset += splitLength;
- splitLength = 0;
- flushCtr = 0;
-
- // New 'split'
- ++noSplits;
-
- // Check if we need to purge an old split
- if (purgeLogSplits) {
- if (noSplits >= noKeepSplits) { // noSplits is zero-based
- File purgeLogSplit = getLogSplit((noSplits-noKeepSplits));
- purgeLogSplit.delete();
- LOG.debug("Purged log-split #" + (noSplits-noKeepSplits) + " - " +
- purgeLogSplit);
+ start = Math.max(0, Math.min(start, size));
+ end = Math.max(0, Math.min(end, size));
+ bytesRemaining = end - start;
+ file = new FileInputStream(filename);
+ // skip upto start
+ long pos = 0;
+ while (pos < start) {
+ long result = file.skip(start - pos);
+ if (result < 0) {
+ bytesRemaining = 0;
+ break;
}
+ pos += result;
}
-
- // Rotate the log
- out = createLogSplit(noSplits);
}
- } // TaskLog.Writer
-
- /**
- * The log-reader for reading the 'split' user-logs.
- *
- */
- static class Reader {
- private String taskId;
- private LogFilter filter;
-
- private File taskLogDir;
- private boolean initialized = false;
-
- private IndexRecord[] indexRecords = null;
- private BufferedReader splitIndex;
-
- private long logFileSize = 0;
-
- /**
- * Create a new task log reader.
- *
- * @param taskId task id of the task.
- * @param filter the {@link LogFilter} to apply on userlogs.
- */
- public Reader(String taskId, LogFilter filter) {
- this.taskId = taskId;
- this.filter = filter;
-
- this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
- }
-
- private static class IndexRecord {
- String splitName;
- long splitOffset;
-
- IndexRecord(String splitName, long splitOffset) {
- this.splitName = splitName;
- this.splitOffset = splitOffset;
+ public int read() throws IOException {
+ int result = -1;
+ if (bytesRemaining > 0) {
+ bytesRemaining -= 1;
+ result = file.read();
}
+ return result;
}
- private synchronized void init() throws IOException {
- this.splitIndex = new BufferedReader(new InputStreamReader(
- new FileInputStream(new File(taskLogDir,
- SPLIT_INDEX_NAME))));
-
- // Parse the split-index and store the offsets/lengths
- ArrayList<IndexRecord> records = new ArrayList<IndexRecord>();
- String line;
- while ((line = splitIndex.readLine()) != null) {
- String[] fields = line.split("\\|");
- if (fields.length != 2) {
- throw new IOException("Malformed split-index with " +
- fields.length + " fields");
- }
-
- IndexRecord record = new IndexRecord(
- fields[0],
- Long.valueOf(fields[1]).longValue()
- );
- LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + ">");
-
- // Save
- records.add(record);
- }
-
- indexRecords = new IndexRecord[records.size()];
- indexRecords = records.toArray(indexRecords);
- IndexRecord lastRecord = indexRecords[records.size() - 1];
- logFileSize = lastRecord.splitOffset
- + new File(lastRecord.splitName).length();
- initialized = true;
- LOG.debug("Log size: " + logFileSize);
- }
-
- /**
- * Return the total 'logical' log-size written by the task, including
- * purged data.
- *
- * @return the total 'logical' log-size written by the task, including
- * purged data.
- * @throws IOException
- */
- public synchronized long getTotalLogSize() throws IOException {
- if (!initialized) {
- init();
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ length = (int) Math.min(length, bytesRemaining);
+ int bytes = file.read(buffer, offset, length);
+ if (bytes > 0) {
+ bytesRemaining -= bytes;
}
-
- return logFileSize;
+ return bytes;
}
- /**
- * Return the entire user-log (remaining splits).
- *
- * @return Returns a <code>byte[]</code> containing the data in user-log.
- * @throws IOException
- */
- public synchronized byte[] fetchAll() throws IOException {
- if (!initialized) {
- init();
- }
-
- // Get all splits
- Vector<InputStream> streams = new Vector<InputStream>();
- for (int i=0; i < indexRecords.length; ++i) {
- InputStream stream = getLogSplit(i);
- if (stream != null) {
- streams.add(stream);
- LOG.debug("Added split: " + i);
- }
- }
- LOG.debug("Total log-size on disk: " + logFileSize);
-
- // Copy log data into buffer
- byte[] b = new byte[(int) logFileSize];
- SequenceInputStream in = new SequenceInputStream(streams.elements());
- try {
- int bytesRead = 0;
- int off = 0;
- LOG.debug("Attempting to read " + logFileSize + " bytes from logs");
- while ((bytesRead = in.read(b, off, (int) logFileSize - off)) > 0) {
- LOG.debug("Got " + bytesRead + " bytes");
- off += bytesRead;
- }
-
- if (off != logFileSize) {
- LOG.debug("Didn't not read all requisite data in logs!");
- }
- } finally {
- try { in.close(); } catch (IOException ex) {}
- }
- return b;
- }
-
- /**
- * Tail the user-log.
- *
- * @param b the buffer into which the data is read.
- * @param off the start offset in array <code>b</code>
- * at which the data is written.
- * @param len the maximum number of bytes to read.
- * @param tailSize the no. of bytes to be read from end of file.
- * @param tailWindow the sliding window for tailing the logs.
- * @return the total number of bytes of user-logs dataread into the buffer.
- * @throws IOException
- */
- public synchronized int tail(byte[] b, int off, int len,
- long tailSize, int tailWindow)
- throws IOException {
- if (!initialized) {
- init();
- }
-
- LOG.debug("tailSize: " + tailSize + " - tailWindow: " + tailWindow);
-
- if (tailSize*tailWindow > logFileSize) {
- tailSize = logFileSize;
- tailWindow = 1;
- }
-
- return read(b, off, len,
- (long)(logFileSize-(tailSize*tailWindow)), tailSize);
+ public int available() throws IOException {
+ return (int) Math.min(bytesRemaining, file.available());
}
- /**
- * Read user-log data given an offset/length.
- *
- * @param b the buffer into which the data is read.
- * @param off the start offset in array <code>b</code>
- * at which the data is written.
- * @param len the maximum number of bytes to read.
- * @param logOffset the offset of the user-log from which to get data.
- * @param logLength the maximum number of bytes of user-log data to fetch.
- * @return the total number of bytes of user-logs dataread into the buffer.
- * @throws IOException
- */
- public synchronized int read(byte[] b, int off, int len,
- long logOffset, long logLength)
- throws IOException {
- LOG.debug("TaskLog.Reader.read: logOffset: " + logOffset + " - logLength: " + logLength);
-
- // Sanity check
- if (logLength == 0) {
- return 0;
- }
-
- if (!initialized) {
- init();
- }
-
- // Locate the requisite splits
- Vector<InputStream> streams = new Vector<InputStream>();
- long offset = logOffset;
- int startIndex = -1, stopIndex = -1;
- boolean inRange = false;
- for (int i=0; i < indexRecords.length; ++i) {
- LOG.debug("offset: " + offset + " - (split, splitOffset) : (" +
- i + ", " + indexRecords[i].splitOffset + ")");
-
- if (offset <= indexRecords[i].splitOffset) {
- if (!inRange) {
- startIndex = i - ((i > 0) ? 1 : 0);
- LOG.debug("Starting at split: " + startIndex);
- offset += logLength;
- InputStream stream = getLogSplit(startIndex);
- if (stream != null) {
- streams.add(stream);
- }
- LOG.debug("Added split: " + startIndex);
- inRange = true;
- } else {
- stopIndex = i-1;
- LOG.debug("Stop at split: " + stopIndex);
- break;
- }
- }
-
- if (inRange) {
- InputStream stream = getLogSplit(i);
- if (stream != null) {
- streams.add(stream);
- }
- LOG.debug("Added split: " + i);
- }
- }
- if (startIndex == -1) {
- throw new IOException("Illegal logOffset/logLength");
- }
- if (stopIndex == -1) {
- stopIndex = indexRecords.length - 1;
- LOG.debug("Stop at split: " + stopIndex);
-
- // Check if request exceeds the log-file size
- if ((logOffset+logLength) > logFileSize) {
- LOG.debug("logOffset+logLength exceeds log-file size");
- logLength = logFileSize - logOffset;
- }
- }
-
- // Copy requisite data into user buffer
- SequenceInputStream in = new SequenceInputStream(streams.elements());
- int totalBytesRead = 0;
- try {
- if (streams.size() == (stopIndex - startIndex +1)) {
- // Skip to get to 'logOffset' if logs haven't been purged
- long skipBytes =
- in.skip(logOffset - indexRecords[startIndex].splitOffset);
- LOG.debug("Skipped " + skipBytes + " bytes from " +
- startIndex + " stream");
- }
- int bytesRead = 0;
- len = Math.min((int)logLength, len);
- LOG.debug("Attempting to read " + len + " bytes from logs");
- while ((bytesRead = in.read(b, off, len)) > 0) {
- off += bytesRead;
- len -= bytesRead;
-
- totalBytesRead += bytesRead;
- }
- } finally {
- try { in.close(); } catch (IOException e) {}
- }
-
- return totalBytesRead;
- }
-
- private synchronized InputStream getLogSplit(int split)
- throws IOException {
- String splitName = indexRecords[split].splitName;
- LOG.debug("About to open the split: " + splitName);
- InputStream in = null;
- try {
- in = new BufferedInputStream(new FileInputStream(new File(splitName)));
- } catch (FileNotFoundException fnfe) {
- in = null;
- LOG.debug("Split " + splitName + " not found... probably purged!");
- }
-
- return in;
+ public void close() throws IOException {
+ file.close();
}
+ }
- } // TaskLog.Reader
+ private static final String bashCommand = "bash";
+ private static final String tailCommand = "tail";
+
+ /**
+ * Get the desired maximum length of task's logs.
+ * @param conf the job to look in
+ * @return the number of bytes to cap the log files at
+ */
+ public static long getTaskLogLength(JobConf conf) {
+ return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+ }
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files.
+ * If the tailLength is 0, the entire output will be saved.
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @return the modified command that should be run
+ */
+ public static List<String> captureOutAndError(List<String> cmd,
+ File stdoutFilename,
+ File stderrFilename,
+ long tailLength
+ ) throws IOException {
+ String stdout = FileUtil.makeShellPath(stdoutFilename);
+ String stderr = FileUtil.makeShellPath(stderrFilename);
+ List<String> result = new ArrayList<String>(3);
+ result.add(bashCommand);
+ result.add("-c");
+ StringBuffer mergedCmd = new StringBuffer();
+ if (tailLength > 0) {
+ mergedCmd.append("(");
+ } else {
+ mergedCmd.append("exec ");
+ }
+ boolean isExecutable = true;
+ for(String s: cmd) {
+ mergedCmd.append('\'');
+ if (isExecutable) {
+ // the executable name needs to be expressed as a shell path for the
+ // shell to find it.
+ mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+ isExecutable = false;
+ } else {
+ mergedCmd.append(s);
+ }
+ mergedCmd.append('\'');
+ mergedCmd.append(" ");
+ }
+ mergedCmd.append(" < /dev/null ");
+ if (tailLength > 0) {
+ mergedCmd.append(" | ");
+ mergedCmd.append(tailCommand);
+ mergedCmd.append(" -c ");
+ mergedCmd.append(tailLength);
+ mergedCmd.append(" >> ");
+ mergedCmd.append(stdout);
+ mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
+ mergedCmd.append(tailCommand);
+ mergedCmd.append(" -c ");
+ mergedCmd.append(tailLength);
+ mergedCmd.append(" >> ");
+ mergedCmd.append(stderr);
+ mergedCmd.append(" ; exit $PIPESTATUS");
+ } else {
+ mergedCmd.append(" 1>> ");
+ mergedCmd.append(stdout);
+ mergedCmd.append(" 2>> ");
+ mergedCmd.append(stderr);
+ }
+ result.add(mergedCmd.toString());
+ return result;
+ }
} // TaskLog
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Mon Aug 6 15:52:23 2007
@@ -18,11 +18,10 @@
package org.apache.hadoop.mapred;
-import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.spi.ErrorCode;
+import org.apache.log4j.FileAppender;
import org.apache.log4j.spi.LoggingEvent;
/**
@@ -30,66 +29,46 @@
* map-reduce system logs.
*
*/
-public class TaskLogAppender extends AppenderSkeleton {
- private TaskLog.Writer taskLogWriter = null;
+public class TaskLogAppender extends FileAppender {
private String taskId;
- private int noKeepSplits;
- private long totalLogFileSize;
- private boolean purgeLogSplits;
- private int logsRetainHours;
+ private int maxEvents;
+ private Queue<LoggingEvent> tail = null;
+ @Override
public void activateOptions() {
- taskLogWriter =
- new TaskLog.Writer(taskId, TaskLog.LogFilter.SYSLOG,
- noKeepSplits, totalLogFileSize, purgeLogSplits, logsRetainHours);
- try {
- taskLogWriter.init();
- } catch (IOException ioe) {
- taskLogWriter = null;
- errorHandler.error("Failed to initialize the task's logging " +
- "infrastructure: " + StringUtils.stringifyException(ioe));
+ synchronized (this) {
+ if (maxEvents > 0) {
+ tail = new LinkedList<LoggingEvent>();
+ }
+ setFile(TaskLog.getTaskLogFile(taskId,
+ TaskLog.LogName.SYSLOG).toString());
+ setAppend(true);
+ super.activateOptions();
}
}
- protected synchronized void append(LoggingEvent event) {
- if (taskLogWriter == null) {
- errorHandler.error("Calling 'append' on uninitialize/closed logger");
- return;
- }
-
- if (this.layout == null) {
- errorHandler.error("No layout for appender " + name ,
- null, ErrorCode.MISSING_LAYOUT);
- return;
- }
-
- // Log the message to the task's log
- String logMessage = this.layout.format(event);
- try {
- byte[] logMessageData = logMessage.getBytes();
- taskLogWriter.write(logMessageData, 0, logMessageData.length);
- } catch (IOException ioe) {
- errorHandler.error("Failed to log: '" + logMessage +
- "' to the task's logging infrastructure with the exception: " +
- StringUtils.stringifyException(ioe));
+ @Override
+ public void append(LoggingEvent event) {
+ synchronized (this) {
+ if (tail == null) {
+ super.append(event);
+ } else {
+ if (tail.size() >= maxEvents) {
+ tail.remove();
+ }
+ tail.add(event);
+ }
}
}
- public boolean requiresLayout() {
- return true;
- }
-
+ @Override
public synchronized void close() {
- if (taskLogWriter != null) {
- try {
- taskLogWriter.close();
- } catch (IOException ioe) {
- errorHandler.error("Failed to close the task's log with the exception: "
- + StringUtils.stringifyException(ioe));
+ if (tail != null) {
+ for(LoggingEvent event: tail) {
+ super.append(event);
}
- } else {
- errorHandler.error("Calling 'close' on uninitialize/closed logger");
}
+ super.close();
}
/**
@@ -104,36 +83,14 @@
this.taskId = taskId;
}
- public int getNoKeepSplits() {
- return noKeepSplits;
- }
-
- public void setNoKeepSplits(int noKeepSplits) {
- this.noKeepSplits = noKeepSplits;
- }
-
- public int getLogsRetainHours() {
- return logsRetainHours;
- }
-
- public void setLogsRetainHours(int logsRetainHours) {
- this.logsRetainHours = logsRetainHours;
- }
-
- public boolean isPurgeLogSplits() {
- return purgeLogSplits;
- }
-
- public void setPurgeLogSplits(boolean purgeLogSplits) {
- this.purgeLogSplits = purgeLogSplits;
- }
-
+ private static final int EVENT_SIZE = 100;
+
public long getTotalLogFileSize() {
- return totalLogFileSize;
+ return maxEvents * EVENT_SIZE;
}
- public void setTotalLogFileSize(long splitFileSize) {
- this.totalLogFileSize = splitFileSize;
+ public void setTotalLogFileSize(long logSize) {
+ maxEvents = (int) logSize / EVENT_SIZE;
}
}
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Aug 6 15:52:23 2007
@@ -23,6 +23,7 @@
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
import java.io.*;
+import java.util.List;
import java.util.Vector;
import java.net.URI;
@@ -41,9 +42,6 @@
protected JobConf conf;
- private TaskLog.Writer taskStdOutLogWriter;
- private TaskLog.Writer taskStdErrLogWriter;
-
/**
* for cleaning up old map outputs
*/
@@ -53,18 +51,6 @@
this.t = t;
this.tracker = tracker;
this.conf = conf;
- this.taskStdOutLogWriter =
- new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDOUT,
- this.conf.getInt("mapred.userlog.num.splits", 4),
- this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024,
- this.conf.getBoolean("mapred.userlog.purgesplits", true),
- this.conf.getInt("mapred.userlog.retain.hours", 12));
- this.taskStdErrLogWriter =
- new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDERR,
- this.conf.getInt("mapred.userlog.num.splits", 4),
- this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024,
- this.conf.getBoolean("mapred.userlog.purgesplits", true),
- this.conf.getInt("mapred.userlog.retain.hours", 12));
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
}
@@ -76,8 +62,6 @@
* process before the child is spawned. It should not execute user code,
* only system code. */
public boolean prepare() throws IOException {
- taskStdOutLogWriter.init(); // initialize the child task's stdout log
- taskStdErrLogWriter.init(); // initialize the child task's stderr log
return true;
}
@@ -105,6 +89,7 @@
//before preparing the job localize
//all the archives
File workDir = new File(t.getJobFile()).getParentFile();
+ String taskid = t.getTaskId();
File jobCacheDir = new File(workDir.getParent(), "work");
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
@@ -257,7 +242,7 @@
String javaOpts = handleDeprecatedHeapSize(
conf.get("mapred.child.java.opts", "-Xmx200m"),
conf.get("mapred.child.heap.size"));
- javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
+ javaOpts = replaceAll(javaOpts, "@taskid@", taskid);
int port = conf.getInt("mapred.task.tracker.report.port", 50050) + 1;
javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
String [] javaOptsSplit = javaOpts.split(" ");
@@ -284,24 +269,27 @@
vargs.add(classPath.toString());
// Setup the log4j prop
+ long logSize = TaskLog.getTaskLogLength(conf);
vargs.add("-Dhadoop.log.dir=" + System.getProperty("hadoop.log.dir"));
vargs.add("-Dhadoop.root.logger=INFO,TLA");
- vargs.add("-Dhadoop.tasklog.taskid=" + t.getTaskId());
- vargs.add("-Dhadoop.tasklog.noKeepSplits=" + conf.getInt("mapred.userlog.num.splits", 4));
- vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + (conf.getInt("mapred.userlog.limit.kb", 100) * 1024));
- vargs.add("-Dhadoop.tasklog.purgeLogSplits=" + conf.getBoolean("mapred.userlog.purgesplits", true));
- vargs.add("-Dhadoop.tasklog.logsRetainHours=" + conf.getInt("mapred.userlog.retain.hours", 12));
-
-
+ vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+ vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
// Add main class and its arguments
vargs.add(TaskTracker.Child.class.getName()); // main of Child
// pass umbilical port
- vargs.add(tracker.getTaskTrackerReportPort() + "");
- vargs.add(t.getTaskId()); // pass task identifier
+ vargs.add(Integer.toString(tracker.getTaskTrackerReportPort()));
+ vargs.add(taskid); // pass task identifier
// Run java
- runChild((String[])vargs.toArray(new String[0]), workDir);
+ File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+ File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ stdout.getParentFile().mkdirs();
+
+ List<String> wrappedCommand =
+ TaskLog.captureOutAndError(vargs, stdout, stderr, logSize);
+ runChild(wrappedCommand, workDir);
+
} catch (FSError e) {
LOG.fatal("FSError", e);
try {
@@ -402,48 +390,22 @@
/**
* Run the child process
*/
- private void runChild(String[] args, File dir) throws IOException {
- this.process = Runtime.getRuntime().exec(args, null, dir);
+ private void runChild(List<String> args, File dir) throws IOException {
+ ProcessBuilder builder = new ProcessBuilder(args);
+ builder.directory(dir);
+ process = builder.start();
- Thread logStdErrThread = null;
- Thread logStdOutThread = null;
try {
- // Copy stderr of the child-process via a thread
- logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"),
- process.getErrorStream(),
- taskStdErrLogWriter);
-
- // Copy stdout of the child-process via a thread
- logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"),
- process.getInputStream(),
- taskStdOutLogWriter);
-
int exit_code = process.waitFor();
if (!killed && exit_code != 0) {
throw new IOException("Task process exit with nonzero status of " +
exit_code + ".");
}
-
} catch (InterruptedException e) {
throw new IOException(e.toString());
} finally {
- kill();
-
- // Kill both stdout/stderr copying threads
- if (logStdErrThread != null) {
- logStdErrThread.interrupt();
- try {
- logStdErrThread.join();
- } catch (InterruptedException ie) {}
- }
-
- if (logStdOutThread != null) {
- logStdOutThread.interrupt();
- try {
- logStdOutThread.join();
- } catch (InterruptedException ie) {}
- }
+ kill();
}
}
@@ -457,48 +419,4 @@
killed = true;
}
- /**
- * Spawn a new thread to copy the child-jvm's stdout/stderr streams
- * via a {@link TaskLog.Writer}
- *
- * @param threadName thread name
- * @param stream child-jvm's stdout/stderr stream
- * @param writer {@link TaskLog.Writer} used to copy the child-jvm's data
- * @return Return the newly created thread
- */
- private Thread logStream(String threadName,
- final InputStream stream,
- final TaskLog.Writer taskLog) {
- Thread loggerThread = new Thread() {
- public void run() {
- try {
- byte[] buf = new byte[512];
- while (!Thread.interrupted()) {
- while (stream.available() > 0) {
- int n = stream.read(buf, 0, buf.length);
- taskLog.write(buf, 0, n);
- }
- Thread.sleep(1000);
- }
- } catch (IOException e) {
- LOG.warn(t.getTaskId()+" Error reading child output", e);
- } catch (InterruptedException e) {
- // expected
- } finally {
- try {
- stream.close();
- taskLog.close();
- } catch (IOException e) {
- LOG.warn(t.getTaskId()+" Error closing child output", e);
- }
- }
- }
- };
- loggerThread.setName(threadName);
- loggerThread.setDaemon(true);
- loggerThread.start();
-
- return loggerThread;
- }
-
}
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Aug 6 15:52:23 2007
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.mapred;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FilterOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.BindException;
@@ -737,6 +741,7 @@
server.setAttribute("localDirAllocator", localDirAllocator);
server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+ server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
server.start();
this.httpPort = server.getPort();
initialize();
@@ -1743,6 +1748,7 @@
* The main() for child processes.
*/
public static class Child {
+
public static void main(String[] args) throws Throwable {
//LogFactory.showTime(false);
LOG.debug("Child starting");
@@ -1758,6 +1764,7 @@
Task task = umbilical.getTask(taskid);
JobConf job = new JobConf(task.getJobFile());
+ TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
task.setConf(job);
defaultConf.addFinalResource(new Path(task.getJobFile()));
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/Application.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/Application.java Mon Aug 6 15:52:23 2007
@@ -18,9 +18,8 @@
package org.apache.hadoop.mapred.pipes;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
@@ -37,6 +36,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -73,6 +73,12 @@
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
FileUtil.chmod(executable, "a+x");
cmd.add(executable);
+ // wrap the command in a stdout/stderr capture
+ String taskid = conf.get("mapred.task.id");
+ File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+ File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ long logLength = TaskLog.getTaskLogLength(conf);
+ cmd = TaskLog.captureOutAndError(cmd, stdout, stderr, logLength);
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
handler = new OutputHandler(output, reporter);
@@ -140,39 +146,6 @@
}
/**
- * A thread to copy an input stream to an output stream.
- * Errors cause the copy to stop and are not reported back.
- * The input stream is closed when the thread exits. The output stream
- * is not closed.
- */
- private static class OutputCopier extends Thread {
- InputStream in;
- OutputStream out;
- OutputCopier(String name, InputStream in, OutputStream out) {
- super(name);
- this.in = in;
- this.out = out;
- }
- public void run() {
- byte[] buffer = new byte[65536];
- try {
- while (true) {
- int size = in.read(buffer);
- if (size == -1) {
- break;
- }
- out.write(buffer, 0, size);
- }
- } catch (IOException ie) {
- } finally {
- try {
- in.close();
- } catch (IOException ie) { }
- }
- }
- }
-
- /**
* Run a given command in a subprocess, including threads to copy its stdout
* and stderr to our stdout and stderr.
* @param command the command and its arguments
@@ -187,11 +160,6 @@
builder.environment().putAll(env);
}
Process result = builder.start();
- result.getOutputStream().close();
- new OutputCopier("pipes-stdout", result.getInputStream(),
- System.out).start();
- new OutputCopier("pipes-stderr", result.getErrorStream(),
- System.err).start();
return result;
}
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Mon Aug 6 15:52:23 2007
@@ -21,7 +21,6 @@
import java.io.*;
import java.net.Socket;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
Modified: lucene/hadoop/branches/branch-0.14/src/webapps/job/jobfailures.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/webapps/job/jobfailures.jsp?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/webapps/job/jobfailures.jsp (original)
+++ lucene/hadoop/branches/branch-0.14/src/webapps/job/jobfailures.jsp Mon Aug 6 15:52:23 2007
@@ -56,11 +56,11 @@
out.print("<td>");
if (taskTracker != null) {
String taskLogUrl = "http://" + taskTracker.getHost() + ":" +
- taskTracker.getHttpPort() + "/tasklog.jsp?taskid=" +
+ taskTracker.getHttpPort() + "/tasklog?taskid=" +
statuses[i].getTaskId();
- String tailFourKBUrl = taskLogUrl + "&tail=true&tailsize=4096";
- String tailEightKBUrl = taskLogUrl + "&tail=true&tailsize=8192";
- String entireLogUrl = taskLogUrl + "&all=true";
+ String tailFourKBUrl = taskLogUrl + "&start=-4097";
+ String tailEightKBUrl = taskLogUrl + "&start=-8193";
+ String entireLogUrl = taskLogUrl;
out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
Modified: lucene/hadoop/branches/branch-0.14/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/webapps/job/taskdetails.jsp?view=diff&rev=563324&r1=563323&r2=563324
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/branches/branch-0.14/src/webapps/job/taskdetails.jsp Mon Aug 6 15:52:23 2007
@@ -92,10 +92,10 @@
if (taskAttemptTracker == null) {
out.print("n/a");
} else {
- String taskLogUrl = taskAttemptTracker + "/tasklog.jsp?taskid=" +
+ String taskLogUrl = taskAttemptTracker + "/tasklog?taskid=" +
status.getTaskId();
- String tailFourKBUrl = taskLogUrl + "&tail=true&tailsize=4096";
- String tailEightKBUrl = taskLogUrl + "&tail=true&tailsize=8192";
+ String tailFourKBUrl = taskLogUrl + "&start=-4097";
+ String tailEightKBUrl = taskLogUrl + "&start=-8193";
String entireLogUrl = taskLogUrl + "&all=true";
out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");