You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/01/25 03:31:42 UTC

svn commit: r1063107 - in /incubator/hama/trunk: CHANGES.txt conf/log4j.properties src/java/org/apache/hama/bsp/TaskLog.java src/java/org/apache/hama/bsp/TaskLogAppender.java

Author: edwardyoon
Date: Tue Jan 25 02:31:41 2011
New Revision: 1063107

URL: http://svn.apache.org/viewvc?rev=1063107&view=rev
Log:
Add task log appender and Fix log4j rootLogger

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/log4j.properties

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1063107&r1=1063106&r2=1063107&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Jan 25 02:31:41 2011
@@ -195,6 +195,7 @@ Trunk (unreleased changes)
 
   BUG FIXES
   
+    HAMA-350: Add task log appender and Fix log4j rootLogger (edwardyoon)
     HAMA-345: Add execution time calculator to Pi job (edwardyoon)
     HAMA-344: Task successfully finished but system re-attempt (edwardyoon)
     HAMA-343: Fix Maven test fails (Tommaso Teofili via edwardyoon)

Modified: incubator/hama/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/log4j.properties?rev=1063107&r1=1063106&r2=1063107&view=diff
==============================================================================
--- incubator/hama/trunk/conf/log4j.properties (original)
+++ incubator/hama/trunk/conf/log4j.properties Tue Jan 25 02:31:41 2011
@@ -1,5 +1,5 @@
 # Define some default values that can be overridden by system properties
-hama.root.logger=INFO,console,DEBUG
+hama.root.logger=INFO,console
 hama.log.dir=.
 hama.log.file=hama.log
 
@@ -28,6 +28,23 @@ log4j.appender.DRFA.layout.ConversionPat
 # Debugging Pattern format
 #log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
 
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
 
 #
 # console

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java?rev=1063107&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java Tue Jan 25 02:31:41 2011
@@ -0,0 +1,323 @@
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hama.HamaConfiguration;
+
+public class TaskLog {
+  private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
+
+  private static final File LOG_DIR = new File(
+      System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile();
+
+  static {
+    if (!LOG_DIR.exists()) {
+      LOG_DIR.mkdirs();
+    }
+  }
+
+  public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+    return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+  }
+
+  /**
+   * The filter for userlogs.
+   */
+  public static enum LogName {
+    /** Log on the stdout of the task. */
+    STDOUT("stdout"),
+
+    /** Log on the stderr of the task. */
+    STDERR("stderr"),
+
+    /** Log on the map-reduce system logs of the task. */
+    SYSLOG("syslog"),
+
+    /** The java profiler information. */
+    PROFILE("profile.out"),
+
+    /** Log the debug script's stdout */
+    DEBUGOUT("debugout");
+
+    private String prefix;
+
+    private LogName(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public String toString() {
+      return prefix;
+    }
+  }
+
+  private static class TaskLogsPurgeFilter implements FileFilter {
+    long purgeTimeStamp;
+
+    TaskLogsPurgeFilter(long purgeTimeStamp) {
+      this.purgeTimeStamp = purgeTimeStamp;
+    }
+
+    public boolean accept(File file) {
+      LOG.debug("PurgeFilter - file: " + file + ", mtime: "
+          + file.lastModified() + ", purge: " + purgeTimeStamp);
+      return file.lastModified() < purgeTimeStamp;
+    }
+  }
+
+  /**
+   * Purge old user logs.
+   * 
+   * @throws IOException
+   */
+  public static synchronized void cleanup(int logsRetainHours)
+      throws IOException {
+    // Purge logs of tasks on this tasktracker if their
+    // mtime has exceeded "mapred.task.log.retain" hours
+    long purgeTimeStamp = System.currentTimeMillis()
+        - (logsRetainHours * 60L * 60 * 1000);
+    File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
+        purgeTimeStamp));
+    if (oldTaskLogs != null) {
+      for (int i = 0; i < oldTaskLogs.length; ++i) {
+        FileUtil.fullyDelete(oldTaskLogs[i]);
+      }
+    }
+  }
+
+  static class Reader extends InputStream {
+    private long bytesRemaining;
+    private FileInputStream file;
+
+    /**
+     * Read a log file from start to end positions. The offsets may be negative,
+     * in which case they are relative to the end of the file. For example,
+     * Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind,
+     * -4197, -1) is the last 4196 bytes.
+     * 
+     * @param taskid the id of the task to read the log file for
+     * @param kind the kind of log to read
+     * @param start the offset to read from (negative is relative to tail)
+     * @param end the offset to read upto (negative is relative to tail)
+     * @throws IOException
+     */
+    public Reader(TaskAttemptID taskid, LogName kind, long start, long end)
+        throws IOException {
+      // find the right log file
+      File filename = getTaskLogFile(taskid, kind);
+      // calculate the start and stop
+      long size = filename.length();
+      if (start < 0) {
+        start += size + 1;
+      }
+      if (end < 0) {
+        end += size + 1;
+      }
+      start = Math.max(0, Math.min(start, size));
+      end = Math.max(0, Math.min(end, size));
+      bytesRemaining = end - start;
+      file = new FileInputStream(filename);
+      // skip upto start
+      long pos = 0;
+      while (pos < start) {
+        long result = file.skip(start - pos);
+        if (result < 0) {
+          bytesRemaining = 0;
+          break;
+        }
+        pos += result;
+      }
+    }
+
+    @Override
+    public int read() throws IOException {
+      int result = -1;
+      if (bytesRemaining > 0) {
+        bytesRemaining -= 1;
+        result = file.read();
+      }
+      return result;
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+      length = (int) Math.min(length, bytesRemaining);
+      int bytes = file.read(buffer, offset, length);
+      if (bytes > 0) {
+        bytesRemaining -= bytes;
+      }
+      return bytes;
+    }
+
+    @Override
+    public int available() throws IOException {
+      return (int) Math.min(bytesRemaining, file.available());
+    }
+
+    @Override
+    public void close() throws IOException {
+      file.close();
+    }
+  }
+
+  private static final String bashCommand = "bash";
+  private static final String tailCommand = "tail";
+
+  /**
+   * Get the desired maximum length of task's logs.
+   * 
+   * @param conf the job to look in
+   * @return the number of bytes to cap the log files at
+   */
+  public static long getTaskLogLength(HamaConfiguration conf) {
+    return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files. If the
+   * tailLength is 0, the entire output will be saved.
+   * 
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> cmd,
+      File stdoutFilename, File stderrFilename, long tailLength)
+      throws IOException {
+    return captureOutAndError(null, cmd, stdoutFilename, stderrFilename,
+        tailLength);
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files. Setup
+   * commands such as setting memory limit can be passed which will be executed
+   * before exec. If the tailLength is 0, the entire output will be saved.
+   * 
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> setup,
+      List<String> cmd, File stdoutFilename, File stderrFilename,
+      long tailLength) throws IOException {
+    String stdout = FileUtil.makeShellPath(stdoutFilename);
+    String stderr = FileUtil.makeShellPath(stderrFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    if (setup != null && setup.size() > 0) {
+      mergedCmd.append(addCommand(setup, false));
+      mergedCmd.append(";");
+    }
+    if (tailLength > 0) {
+      mergedCmd.append("(");
+    } else {
+      mergedCmd.append("exec ");
+    }
+    mergedCmd.append(addCommand(cmd, true));
+    mergedCmd.append(" < /dev/null ");
+    if (tailLength > 0) {
+      mergedCmd.append(" | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stderr);
+      mergedCmd.append(" ; exit $PIPESTATUS");
+    } else {
+      mergedCmd.append(" 1>> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" 2>> ");
+      mergedCmd.append(stderr);
+    }
+    result.add(mergedCmd.toString());
+    return result;
+  }
+
+  /**
+   * Add quotes to each of the command strings and return as a single string
+   * 
+   * @param cmd The command to be quoted
+   * @param isExecutable makes shell path if the first argument is executable
+   * @return returns The quoted string.
+   * @throws IOException
+   */
+  public static String addCommand(List<String> cmd, boolean isExecutable)
+      throws IOException {
+    StringBuffer command = new StringBuffer();
+    for (String s : cmd) {
+      command.append('\'');
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the
+        // shell to find it.
+        command.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false;
+      } else {
+        command.append(s);
+      }
+      command.append('\'');
+      command.append(" ");
+    }
+    return command.toString();
+  }
+
+  /**
+   * Wrap a command in a shell to capture debug script's stdout and stderr to
+   * debugout.
+   * 
+   * @param cmd The command and the arguments that should be run
+   * @param debugoutFilename The filename that stdout and stderr should be saved
+   *          to.
+   * @return the modified command that should be run
+   * @throws IOException
+   */
+  public static List<String> captureDebugOut(List<String> cmd,
+      File debugoutFilename) throws IOException {
+    String debugout = FileUtil.makeShellPath(debugoutFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    mergedCmd.append("exec ");
+    boolean isExecutable = true;
+    for (String s : cmd) {
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the
+        // shell to find it.
+        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false;
+      } else {
+        mergedCmd.append(s);
+      }
+      mergedCmd.append(" ");
+    }
+    mergedCmd.append(" < /dev/null ");
+    mergedCmd.append(" >");
+    mergedCmd.append(debugout);
+    mergedCmd.append(" 2>&1 ");
+    result.add(mergedCmd.toString());
+    return result;
+  }
+
+} // TaskLog

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java?rev=1063107&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java Tue Jan 25 02:31:41 2011
@@ -0,0 +1,75 @@
+package org.apache.hama.bsp;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class TaskLogAppender extends FileAppender {
+  private String taskId; // taskId should be managed as String rather than
+                         // TaskID object
+  // so that log4j can configure it from the configuration(log4j.properties).
+  private int maxEvents;
+  private Queue<LoggingEvent> tail = null;
+
+  @Override
+  public void activateOptions() {
+    synchronized (this) {
+      if (maxEvents > 0) {
+        tail = new LinkedList<LoggingEvent>();
+      }
+      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+          TaskLog.LogName.SYSLOG).toString());
+      setAppend(true);
+      super.activateOptions();
+    }
+  }
+
+  @Override
+  public void append(LoggingEvent event) {
+    synchronized (this) {
+      if (tail == null) {
+        super.append(event);
+      } else {
+        if (tail.size() >= maxEvents) {
+          tail.remove();
+        }
+        tail.add(event);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    if (tail != null) {
+      for (LoggingEvent event : tail) {
+        super.append(event);
+      }
+    }
+    super.close();
+  }
+
+  /**
+   * Getter/Setter methods for log4j.
+   */
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(String taskId) {
+    this.taskId = taskId;
+  }
+
+  private static final int EVENT_SIZE = 100;
+
+  public long getTotalLogFileSize() {
+    return maxEvents * EVENT_SIZE;
+  }
+
+  public void setTotalLogFileSize(long logSize) {
+    maxEvents = (int) logSize / EVENT_SIZE;
+  }
+
+}