You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/01/25 03:31:42 UTC
svn commit: r1063107 - in /incubator/hama/trunk: CHANGES.txt
conf/log4j.properties src/java/org/apache/hama/bsp/TaskLog.java
src/java/org/apache/hama/bsp/TaskLogAppender.java
Author: edwardyoon
Date: Tue Jan 25 02:31:41 2011
New Revision: 1063107
URL: http://svn.apache.org/viewvc?rev=1063107&view=rev
Log:
Add task log appender and Fix log4j rootLogger
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/conf/log4j.properties
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1063107&r1=1063106&r2=1063107&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Jan 25 02:31:41 2011
@@ -195,6 +195,7 @@ Trunk (unreleased changes)
BUG FIXES
+ HAMA-350: Add task log appender and Fix log4j rootLogger (edwardyoon)
HAMA-345: Add execution time calculator to Pi job (edwardyoon)
HAMA-344: Task successfully finished but system re-attempt (edwardyoon)
HAMA-343: Fix Maven test fails (Tommaso Teofili via edwardyoon)
Modified: incubator/hama/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/log4j.properties?rev=1063107&r1=1063106&r2=1063107&view=diff
==============================================================================
--- incubator/hama/trunk/conf/log4j.properties (original)
+++ incubator/hama/trunk/conf/log4j.properties Tue Jan 25 02:31:41 2011
@@ -1,5 +1,5 @@
# Define some default values that can be overridden by system properties
-hama.root.logger=INFO,console,DEBUG
+hama.root.logger=INFO,console
hama.log.dir=.
hama.log.file=hama.log
@@ -28,6 +28,23 @@ log4j.appender.DRFA.layout.ConversionPat
# Debugging Pattern format
#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
#
# console
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java?rev=1063107&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java Tue Jan 25 02:31:41 2011
@@ -0,0 +1,323 @@
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hama.HamaConfiguration;
+
+public class TaskLog {
+ private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
+
+ private static final File LOG_DIR = new File(
+ System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile();
+
+ static {
+ if (!LOG_DIR.exists()) {
+ LOG_DIR.mkdirs();
+ }
+ }
+
+ public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+ return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+ }
+
+ /**
+ * The filter for userlogs.
+ */
+ public static enum LogName {
+ /** Log on the stdout of the task. */
+ STDOUT("stdout"),
+
+ /** Log on the stderr of the task. */
+ STDERR("stderr"),
+
+ /** Log on the map-reduce system logs of the task. */
+ SYSLOG("syslog"),
+
+ /** The java profiler information. */
+ PROFILE("profile.out"),
+
+ /** Log the debug script's stdout */
+ DEBUGOUT("debugout");
+
+ private String prefix;
+
+ private LogName(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String toString() {
+ return prefix;
+ }
+ }
+
+ private static class TaskLogsPurgeFilter implements FileFilter {
+ long purgeTimeStamp;
+
+ TaskLogsPurgeFilter(long purgeTimeStamp) {
+ this.purgeTimeStamp = purgeTimeStamp;
+ }
+
+ public boolean accept(File file) {
+ LOG.debug("PurgeFilter - file: " + file + ", mtime: "
+ + file.lastModified() + ", purge: " + purgeTimeStamp);
+ return file.lastModified() < purgeTimeStamp;
+ }
+ }
+
+ /**
+ * Purge old user logs.
+ *
+ * @throws IOException
+ */
+ public static synchronized void cleanup(int logsRetainHours)
+ throws IOException {
+ // Purge logs of tasks on this tasktracker if their
+ // mtime has exceeded "mapred.task.log.retain" hours
+ long purgeTimeStamp = System.currentTimeMillis()
+ - (logsRetainHours * 60L * 60 * 1000);
+ File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
+ purgeTimeStamp));
+ if (oldTaskLogs != null) {
+ for (int i = 0; i < oldTaskLogs.length; ++i) {
+ FileUtil.fullyDelete(oldTaskLogs[i]);
+ }
+ }
+ }
+
+ static class Reader extends InputStream {
+ private long bytesRemaining;
+ private FileInputStream file;
+
+ /**
+ * Read a log file from start to end positions. The offsets may be negative,
+ * in which case they are relative to the end of the file. For example,
+ * Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind,
+ * -4197, -1) is the last 4196 bytes.
+ *
+ * @param taskid the id of the task to read the log file for
+ * @param kind the kind of log to read
+ * @param start the offset to read from (negative is relative to tail)
+ * @param end the offset to read upto (negative is relative to tail)
+ * @throws IOException
+ */
+ public Reader(TaskAttemptID taskid, LogName kind, long start, long end)
+ throws IOException {
+ // find the right log file
+ File filename = getTaskLogFile(taskid, kind);
+ // calculate the start and stop
+ long size = filename.length();
+ if (start < 0) {
+ start += size + 1;
+ }
+ if (end < 0) {
+ end += size + 1;
+ }
+ start = Math.max(0, Math.min(start, size));
+ end = Math.max(0, Math.min(end, size));
+ bytesRemaining = end - start;
+ file = new FileInputStream(filename);
+ // skip upto start
+ long pos = 0;
+ while (pos < start) {
+ long result = file.skip(start - pos);
+ if (result < 0) {
+ bytesRemaining = 0;
+ break;
+ }
+ pos += result;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = -1;
+ if (bytesRemaining > 0) {
+ bytesRemaining -= 1;
+ result = file.read();
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ length = (int) Math.min(length, bytesRemaining);
+ int bytes = file.read(buffer, offset, length);
+ if (bytes > 0) {
+ bytesRemaining -= bytes;
+ }
+ return bytes;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) Math.min(bytesRemaining, file.available());
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.close();
+ }
+ }
+
+ private static final String bashCommand = "bash";
+ private static final String tailCommand = "tail";
+
+ /**
+ * Get the desired maximum length of task's logs.
+ *
+ * @param conf the job to look in
+ * @return the number of bytes to cap the log files at
+ */
+ public static long getTaskLogLength(HamaConfiguration conf) {
+ return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+ }
+
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files. If the
+ * tailLength is 0, the entire output will be saved.
+ *
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @return the modified command that should be run
+ */
+ public static List<String> captureOutAndError(List<String> cmd,
+ File stdoutFilename, File stderrFilename, long tailLength)
+ throws IOException {
+ return captureOutAndError(null, cmd, stdoutFilename, stderrFilename,
+ tailLength);
+ }
+
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files. Setup
+ * commands such as setting memory limit can be passed which will be executed
+ * before exec. If the tailLength is 0, the entire output will be saved.
+ *
+ * @param setup The setup commands for the execed process.
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @return the modified command that should be run
+ */
+ public static List<String> captureOutAndError(List<String> setup,
+ List<String> cmd, File stdoutFilename, File stderrFilename,
+ long tailLength) throws IOException {
+ String stdout = FileUtil.makeShellPath(stdoutFilename);
+ String stderr = FileUtil.makeShellPath(stderrFilename);
+ List<String> result = new ArrayList<String>(3);
+ result.add(bashCommand);
+ result.add("-c");
+ StringBuffer mergedCmd = new StringBuffer();
+ if (setup != null && setup.size() > 0) {
+ mergedCmd.append(addCommand(setup, false));
+ mergedCmd.append(";");
+ }
+ if (tailLength > 0) {
+ mergedCmd.append("(");
+ } else {
+ mergedCmd.append("exec ");
+ }
+ mergedCmd.append(addCommand(cmd, true));
+ mergedCmd.append(" < /dev/null ");
+ if (tailLength > 0) {
+ mergedCmd.append(" | ");
+ mergedCmd.append(tailCommand);
+ mergedCmd.append(" -c ");
+ mergedCmd.append(tailLength);
+ mergedCmd.append(" >> ");
+ mergedCmd.append(stdout);
+ mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
+ mergedCmd.append(tailCommand);
+ mergedCmd.append(" -c ");
+ mergedCmd.append(tailLength);
+ mergedCmd.append(" >> ");
+ mergedCmd.append(stderr);
+ mergedCmd.append(" ; exit $PIPESTATUS");
+ } else {
+ mergedCmd.append(" 1>> ");
+ mergedCmd.append(stdout);
+ mergedCmd.append(" 2>> ");
+ mergedCmd.append(stderr);
+ }
+ result.add(mergedCmd.toString());
+ return result;
+ }
+
+ /**
+ * Add quotes to each of the command strings and return as a single string
+ *
+ * @param cmd The command to be quoted
+ * @param isExecutable makes shell path if the first argument is executable
+ * @return returns The quoted string.
+ * @throws IOException
+ */
+ public static String addCommand(List<String> cmd, boolean isExecutable)
+ throws IOException {
+ StringBuffer command = new StringBuffer();
+ for (String s : cmd) {
+ command.append('\'');
+ if (isExecutable) {
+ // the executable name needs to be expressed as a shell path for the
+ // shell to find it.
+ command.append(FileUtil.makeShellPath(new File(s)));
+ isExecutable = false;
+ } else {
+ command.append(s);
+ }
+ command.append('\'');
+ command.append(" ");
+ }
+ return command.toString();
+ }
+
+ /**
+ * Wrap a command in a shell to capture debug script's stdout and stderr to
+ * debugout.
+ *
+ * @param cmd The command and the arguments that should be run
+ * @param debugoutFilename The filename that stdout and stderr should be saved
+ * to.
+ * @return the modified command that should be run
+ * @throws IOException
+ */
+ public static List<String> captureDebugOut(List<String> cmd,
+ File debugoutFilename) throws IOException {
+ String debugout = FileUtil.makeShellPath(debugoutFilename);
+ List<String> result = new ArrayList<String>(3);
+ result.add(bashCommand);
+ result.add("-c");
+ StringBuffer mergedCmd = new StringBuffer();
+ mergedCmd.append("exec ");
+ boolean isExecutable = true;
+ for (String s : cmd) {
+ if (isExecutable) {
+ // the executable name needs to be expressed as a shell path for the
+ // shell to find it.
+ mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+ isExecutable = false;
+ } else {
+ mergedCmd.append(s);
+ }
+ mergedCmd.append(" ");
+ }
+ mergedCmd.append(" < /dev/null ");
+ mergedCmd.append(" >");
+ mergedCmd.append(debugout);
+ mergedCmd.append(" 2>&1 ");
+ result.add(mergedCmd.toString());
+ return result;
+ }
+
+} // TaskLog
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java?rev=1063107&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java Tue Jan 25 02:31:41 2011
@@ -0,0 +1,75 @@
+package org.apache.hama.bsp;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class TaskLogAppender extends FileAppender {
+ private String taskId; // taskId should be managed as String rather than
+ // TaskID object
+ // so that log4j can configure it from the configuration(log4j.properties).
+ private int maxEvents;
+ private Queue<LoggingEvent> tail = null;
+
+ @Override
+ public void activateOptions() {
+ synchronized (this) {
+ if (maxEvents > 0) {
+ tail = new LinkedList<LoggingEvent>();
+ }
+ setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+ TaskLog.LogName.SYSLOG).toString());
+ setAppend(true);
+ super.activateOptions();
+ }
+ }
+
+ @Override
+ public void append(LoggingEvent event) {
+ synchronized (this) {
+ if (tail == null) {
+ super.append(event);
+ } else {
+ if (tail.size() >= maxEvents) {
+ tail.remove();
+ }
+ tail.add(event);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (tail != null) {
+ for (LoggingEvent event : tail) {
+ super.append(event);
+ }
+ }
+ super.close();
+ }
+
+ /**
+ * Getter/Setter methods for log4j.
+ */
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ private static final int EVENT_SIZE = 100;
+
+ public long getTotalLogFileSize() {
+ return maxEvents * EVENT_SIZE;
+ }
+
+ public void setTotalLogFileSize(long logSize) {
+ maxEvents = (int) logSize / EVENT_SIZE;
+ }
+
+}