You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/03/01 21:14:22 UTC

svn commit: r513478 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/mapred/ src/webapps/task/

Author: cutting
Date: Thu Mar  1 12:14:21 2007
New Revision: 513478

URL: http://svn.apache.org/viewvc?view=rev&rev=513478
Log:
HADOOP-1000.  Fix so that log messages in task subprocesses are not written to a task's standard error.  Contributed by Arun.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/log4j.properties
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/webapps/task/tasklog.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513478&r1=513477&r2=513478
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Mar  1 12:14:21 2007
@@ -171,6 +171,9 @@
 51. HADOOP-941.  Enhance record facility.
     (Milind Bhandarkar via cutting)
 
+52. HADOOP-1000.  Fix so that log messages in task subprocesses are
+    not written to a task's standard error.  (Arun C Murthy via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/log4j.properties?view=diff&rev=513478&r1=513477&r2=513478
==============================================================================
--- lucene/hadoop/trunk/conf/log4j.properties (original)
+++ lucene/hadoop/trunk/conf/log4j.properties Thu Mar  1 12:14:21 2007
@@ -40,6 +40,27 @@
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
 
 #
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.noKeepSplits=${hadoop.tasklog.noKeepSplits}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+log4j.appender.TLA.purgeLogSplits=${hadoop.tasklog.purgeLogSplits}
+log4j.appender.TLA.logsRetainHours=${hadoop.tasklog.logsRetainHours}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
 # Rolling File Appender
 #
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=513478&r1=513477&r2=513478
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Thu Mar  1 12:14:21 2007
@@ -45,6 +45,10 @@
     }
   }
 
+  private static File getTaskLogDir(String taskid, LogFilter filter) {
+    return new File(new File(LOG_DIR, taskid), filter.getPrefix());
+  }
+  
   /**
    * The filter for userlogs.
    */
@@ -53,7 +57,10 @@
     STDOUT ("stdout"),
 
     /** Log on the stderr of the task. */
-    STDERR ("stderr");
+    STDERR ("stderr"),
+    
+    /** Log on the map-reduce system logs of the task. */
+    SYSLOG ("syslog");
     
     private String prefix;
     
@@ -102,20 +109,17 @@
      * @param taskId taskid of the task
      * @param filter the {@link LogFilter} to apply on userlogs.
      */
-    Writer(JobConf conf, String taskId, LogFilter filter) {
-      this.conf = conf;
+    Writer(String taskId, LogFilter filter, 
+            int noKeepSplits, long totalLogSize, boolean purgeLogSplits, int logsRetainHours) {
       this.taskId = taskId;
       this.filter = filter;
       
-      this.taskLogDir = new File(new File(LOG_DIR, this.taskId), 
-                                this.filter.getPrefix());
+      this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
       
-      noKeepSplits = this.conf.getInt("mapred.userlog.num.splits", 4);
-      splitFileSize = 
-        (this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024) / noKeepSplits; 
-      purgeLogSplits = this.conf.getBoolean("mapred.userlog.purgesplits", 
-                                      true);
-      logsRetainHours = this.conf.getInt("mapred.userlog.retain.hours", 12);
+      this.noKeepSplits = noKeepSplits;
+      this.splitFileSize = (totalLogSize / noKeepSplits);
+      this.purgeLogSplits = purgeLogSplits;
+      this.logsRetainHours = logsRetainHours;
     }
     
     private static class TaskLogsPurgeFilter implements FileFilter {
@@ -220,7 +224,7 @@
      * 
      * @throws IOException
      */
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
       // Close the final split
       if (out != null) {
         out.close();
@@ -306,8 +310,7 @@
       this.taskId = taskId;
       this.filter = filter;
       
-      this.taskLogDir = new File(new File(LOG_DIR, this.taskId), 
-                                this.filter.getPrefix());
+      this.taskLogDir = getTaskLogDir(this.taskId, this.filter);
     }
 
     private static class IndexRecord {

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?view=auto&rev=513478
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Thu Mar  1 12:14:21 2007
@@ -0,0 +1,120 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.ErrorCode;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * A simple log4j-appender for the task child's 
+ * map-reduce system logs.
+ * 
+ * @author Arun C Murthy
+ */
+public class TaskLogAppender extends AppenderSkeleton {
+  private TaskLog.Writer taskLogWriter = null;
+  private String taskId;
+  private int noKeepSplits;
+  private long totalLogFileSize;
+  private boolean purgeLogSplits;
+  private int logsRetainHours;
+
+  public void activateOptions() {
+    taskLogWriter = 
+      new TaskLog.Writer(taskId, TaskLog.LogFilter.SYSLOG, 
+              noKeepSplits, totalLogFileSize, purgeLogSplits, logsRetainHours);
+    try {
+      taskLogWriter.init();
+    } catch (IOException ioe) {
+      taskLogWriter = null;
+      errorHandler.error("Failed to initialize the task's logging " +
+              "infrastructure: " + StringUtils.stringifyException(ioe));
+    }
+  }
+  
+  protected synchronized void append(LoggingEvent event) {
+    if (taskLogWriter == null) {
+      errorHandler.error("Calling 'append' on uninitialize/closed logger");
+      return;
+    }
+
+    if (this.layout == null) {
+      errorHandler.error("No layout for appender " + name , 
+              null, ErrorCode.MISSING_LAYOUT );
+    }
+    
+    // Log the message to the task's log
+    String logMessage = this.layout.format(event);
+    try {
+      taskLogWriter.write(logMessage.getBytes(), 0, logMessage.length());
+    } catch (IOException ioe) {
+      errorHandler.error("Failed to log: '" + logMessage + 
+              "' to the task's logging infrastructure with the exception: " + 
+              StringUtils.stringifyException(ioe));
+    }
+  }
+
+  public boolean requiresLayout() {
+    return true;
+  }
+
+  public synchronized void close() {
+    if (taskLogWriter != null) {
+      try {
+        taskLogWriter.close();
+      } catch (IOException ioe) {
+        errorHandler.error("Failed to close the task's log with the exception: " 
+                + StringUtils.stringifyException(ioe));
+      }
+    } else {
+      errorHandler.error("Calling 'close' on uninitialize/closed logger");
+    }
+  }
+
+  /**
+   * Getter/Setter methods for log4j.
+   */
+  
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(String taskId) {
+    this.taskId = taskId;
+  }
+
+  public int getNoKeepSplits() {
+    return noKeepSplits;
+  }
+
+  public void setNoKeepSplits(int noKeepSplits) {
+    this.noKeepSplits = noKeepSplits;
+  }
+
+  public int getLogsRetainHours() {
+    return logsRetainHours;
+  }
+
+  public void setLogsRetainHours(int logsRetainHours) {
+    this.logsRetainHours = logsRetainHours;
+  }
+
+  public boolean isPurgeLogSplits() {
+    return purgeLogSplits;
+  }
+
+  public void setPurgeLogSplits(boolean purgeLogSplits) {
+    this.purgeLogSplits = purgeLogSplits;
+  }
+
+  public long getTotalLogFileSize() {
+    return totalLogFileSize;
+  }
+
+  public void setTotalLogFileSize(long splitFileSize) {
+    this.totalLogFileSize = splitFileSize;
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=513478&r1=513477&r2=513478
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Mar  1 12:14:21 2007
@@ -49,9 +49,17 @@
     this.tracker = tracker;
     this.conf = conf;
     this.taskStdOutLogWriter = 
-      new TaskLog.Writer(conf, t.getTaskId(), TaskLog.LogFilter.STDOUT);
+      new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDOUT, 
+              this.conf.getInt("mapred.userlog.num.splits", 4), 
+              this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
+              this.conf.getBoolean("mapred.userlog.purgesplits", true),
+              this.conf.getInt("mapred.userlog.retain.hours", 12));
     this.taskStdErrLogWriter = 
-      new TaskLog.Writer(conf, t.getTaskId(), TaskLog.LogFilter.STDERR);
+      new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDERR, 
+              this.conf.getInt("mapred.userlog.num.splits", 4), 
+              this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
+              this.conf.getBoolean("mapred.userlog.purgesplits", true),
+              this.conf.getInt("mapred.userlog.retain.hours", 12));
   }
 
   public Task getTask() { return t; }
@@ -166,7 +174,7 @@
       classPath.append(sep);
       classPath.append(workDir);
       //  Build exec child jmv args.
-      Vector vargs = new Vector(8);
+      Vector<String> vargs = new Vector<String>(8);
       File jvm =                                  // use same jvm as parent
         new File(new File(System.getProperty("java.home"), "bin"), "java");
 
@@ -209,6 +217,15 @@
       // Add classpath.
       vargs.add("-classpath");
       vargs.add(classPath.toString());
+
+      // Setup the log4j prop
+      vargs.add("-Dhadoop.log.dir=" + System.getProperty("hadoop.log.dir"));
+      vargs.add("-Dhadoop.root.logger=INFO,TLA");
+      vargs.add("-Dhadoop.tasklog.taskid=" + t.getTaskId());
+      vargs.add("-Dhadoop.tasklog.noKeepSplits=" + conf.getInt("mapred.userlog.num.splits", 4)); 
+      vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + (conf.getInt("mapred.userlog.limit.kb", 100) * 1024));
+      vargs.add("-Dhadoop.tasklog.purgeLogSplits=" + conf.getBoolean("mapred.userlog.purgesplits", true));
+      vargs.add("-Dhadoop.tasklog.logsRetainHours=" + conf.getInt("mapred.userlog.retain.hours", 12)); 
 
       // Add java.library.path; necessary for native-hadoop libraries
       String libraryPath = System.getProperty("java.library.path");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=513478&r1=513477&r2=513478
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Mar  1 12:14:21 2007
@@ -63,6 +63,7 @@
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.log4j.LogManager;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -1451,6 +1452,11 @@
               ByteArrayOutputStream baos = new ByteArrayOutputStream();
               throwable.printStackTrace(new PrintStream(baos));
               umbilical.reportDiagnosticInfo(taskid, baos.toString());
+          } finally {
+            // Shutting down log4j of the child-vm... 
+            // This assumes that on return from Task.run() 
+            // there is no more logging done.
+            LogManager.shutdown();
           }
         }
 

Modified: lucene/hadoop/trunk/src/webapps/task/tasklog.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasklog.jsp?view=diff&rev=513478&r1=513477&r2=513478
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/tasklog.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Thu Mar  1 12:14:21 2007
@@ -4,7 +4,8 @@
   import="javax.servlet.http.*"
   import="java.io.*"
 %>
-<%
+<%!
+  String taskId = null;
   long logOffset = -1, logLength = -1;
   boolean tailLog = false;
   long tailSize = 1024;
@@ -12,8 +13,93 @@
   boolean entireLog = false;
   boolean plainText = false;
   TaskLog.LogFilter filter = null;
+
+  private void printTaskLog(JspWriter out, TaskLog.LogFilter filter) 
+  throws IOException {
+    if (!plainText) {
+      out.println("<br><b><u>" + filter + " logs</u></b><br>");
+      out.println("<table border=2 cellpadding=\"2\">");
+    }
+    
+    boolean gotRequiredData = true;
+    try {
+  	  TaskLog.Reader taskLogReader = new TaskLog.Reader(taskId, filter);
+      byte[] b = null;
+  	  int bytesRead = 0;
+  	  int targetLength = 0;
+
+  	  if (entireLog) {
+  	    b = taskLogReader.fetchAll();
+  	    targetLength = bytesRead = b.length;
+  	  } else {
+  	    if (tailLog) {
+  		  b = new byte[(int)tailSize];
+  		  targetLength = (int)tailSize;
+  		  bytesRead = taskLogReader.tail(b, 0, b.length, tailSize, tailWindow);
+  	    } else {
+  		  b = new byte[(int)logLength];
+  		  targetLength = (int)logLength;
+  		  bytesRead = taskLogReader.read(b, 0, b.length, logOffset, logLength);
+    	}
+  	  }
+  	  
+  	  if (bytesRead != targetLength && 
+  	    targetLength <= taskLogReader.getTotalLogSize()) {
+  	    if( !plainText) {
+	  	  out.println("<b>Warning: Could not fetch " + targetLength + 
+	  		  " bytes from the task-logs; probably purged!</b><br/>");
+  	    }else{
+	  	  out.println("Warning: Could not fetch " + targetLength + 
+  		  " bytes from the task-logs; probably purged!");
+  	    }
+  	    gotRequiredData = false;
+  	  }
+	  String logData = new String(b, 0, bytesRead);
+	  if (!plainText) {
+	    out.print("<tr><td><pre>" + logData + "</pre></td></tr>");
+	  } else {
+	    out.print(logData);
+	  }
+    } catch (IOException ioe) {
+  	  out.println("Failed to retrieve '" + filter + "' logs for task: " + taskId);
+    }
   
-  String taskId = request.getParameter("taskid");
+    if( !plainText ) {
+     out.println("</table>\n");
+    } 
+
+    if (!entireLog && !plainText) {
+      if (tailLog) {
+        if (gotRequiredData) {
+  	  	  out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+  		    "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow+1) + 
+  		    "&filter=" + filter + "'>Earlier</a>");
+  	    }
+  	    if (tailWindow > 1) {
+          out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+  	  	    "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow-1) 
+  	  	    + "&filter=" + filter + "'>Later</a>");
+  	    }
+      } else {
+        if (gotRequiredData) {
+      	  out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+    		"&tail=false&off=" + Math.max(0, (logOffset-logLength)) +
+  		  	"&len=" + logLength + "&filter=" + filter + "'>Earlier</a>");
+  	    }
+  	    out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
+  		  "&tail=false&off=" + (logOffset+logLength) +
+  		  "&len=" + logLength + "&filter=" + filter + "'>Later</a>");
+      }
+    }
+    
+    if (!plainText) {
+      out.println("<hr><br>");
+    }
+  }
+%>
+
+<%  
+  taskId = request.getParameter("taskid");
   if (taskId == null) {
   	out.println("<h2>Missing 'taskid' for fetching logs!</h2>");
   	return;
@@ -78,91 +164,18 @@
   
   if( !plainText ) {
     out.println("<html>");
-    out.println("<title>Task Logs: '" + taskId + "' (" + logFilter + ")</title>"); 
+    out.println("<title>Task Logs: '" + taskId + "'</title>"); 
     out.println("<body>");
-    out.println("<h1>Task Logs from " +  taskId + "'s " + logFilter + "</h1><br>"); 
-    out.println("<h2>'" + logFilter + "':</h2>");
-    out.println("<pre>");
-
-  }
-%>
-
-<%
-  boolean gotRequiredData = true;
-  try {
-  	TaskLog.Reader taskLogReader = new TaskLog.Reader(taskId, filter);
-    byte[] b = null;
-  	int bytesRead = 0;
-  	int targetLength = 0;
-
-  	if (entireLog) {
-  	  b = taskLogReader.fetchAll();
-  	  targetLength = bytesRead = b.length;
-  	} else {
-  	  if (tailLog) {
-  		b = new byte[(int)tailSize];
-  		targetLength = (int)tailSize;
-  		bytesRead = taskLogReader.tail(b, 0, b.length, tailSize, tailWindow);
-  	  } else {
-  		b = new byte[(int)logLength];
-  		targetLength = (int)logLength;
-  		bytesRead = taskLogReader.read(b, 0, b.length, logOffset, logLength);
-   	  }
-  	}
-  	  
-  	if (bytesRead != targetLength && 
-  	  targetLength <= taskLogReader.getTotalLogSize()) {
-  	  if( !plainText) {
-	  	  out.println("<b>Warning: Could not fetch " + targetLength + 
-	  		  " bytes from the task-logs; probably purged!</b><br/>");
-  	  }else{
-	  	  out.println("Warning: Could not fetch " + targetLength + 
-  		  " bytes from the task-logs; probably purged!");
-  	  }
-  	  gotRequiredData = false;
-  	}
-	String logData = new String(b, 0, bytesRead);
-	out.println(logData);
-  } catch (IOException ioe) {
-  	out.println("Failed to retrieve '" + logFilter + "' logs for task: " + taskId);
-  }
-  
-  if( !plainText ) {
-    out.println("</pre>");
-  }
-%>
-<%
-  if (!entireLog && !plainText) {
-    if (tailLog) {
-      if (gotRequiredData) {
-  	  	out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
-  		    "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow+1) + 
-  		    "&filter=" + logFilter + "'>Earlier</a>");
-  	  }
-  	  if (tailWindow > 1) {
-        out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
-  	  	    "&tail=true&tailsize=" + tailSize + "&tailwindow=" + (tailWindow-1) 
-  	  	    + "&filter=" + logFilter + "'>Later</a>");
-  	  }
-    } else {
-      if (gotRequiredData) {
-      	out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
-    		"&tail=false&off=" + Math.max(0, (logOffset-logLength)) +
-  		  	"&len=" + logLength + "&filter=" + logFilter + "'>Earlier</a>");
-  	  }
-  	  out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
-  		  "&tail=false&off=" + (logOffset+logLength) +
-  		  "&len=" + logLength + "&filter=" + logFilter + "'>Later</a>");
-    }
-  }
-  if( !plainText ) {
-    String otherFilter = (logFilter.equals("stdout") ? "stderr" : "stdout");
-    out.println("<br><br>See <a href='/tasklog.jsp?taskid=" + taskId + 
-        "&all=true&filter=" + otherFilter + "'>" + otherFilter + "</a>" +
-        " logs of this task");
-    out.println("<hr>");
+    out.println("<h1>Task Logs: '" +  taskId +  "'</h1><br>"); 
+    
+    printTaskLog(out, TaskLog.LogFilter.STDOUT);
+    printTaskLog(out, TaskLog.LogFilter.STDERR);
+    printTaskLog(out, TaskLog.LogFilter.SYSLOG);
+    
     out.println("<a href='http://lucene.apache.org/hadoop'>Hadoop</a>, 2006.<br>");
     out.println("</body>");
     out.println("</html>");
-  }
+  } else {
+    printTaskLog(out, filter);
+  } 
 %>