You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/16 22:28:51 UTC

svn commit: r496864 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/webapps/task/

Author: cutting
Date: Tue Jan 16 13:28:50 2007
New Revision: 496864

URL: http://svn.apache.org/viewvc?view=rev&rev=496864
Log:
HADOOP-801.  Add to jobtracker a log of task completion events.  Contributed by Sanjay.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
    lucene/hadoop/trunk/src/webapps/task/tasklog.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 13:28:50 2007
@@ -25,6 +25,9 @@
     for better compatibility with some monitoring systems.
     (Nigel Daley via cutting)
 
+ 7. HADOOP-801.  Add to jobtracker a log of task completion events.
+    (Sanjay Dahiya via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Jan 16 13:28:50 2007
@@ -91,6 +91,17 @@
   public void reportTaskTrackerError(String taskTracker,
                                      String errorClass,
                                      String errorMessage) throws IOException;
+  /**
+   * Get task completion events for the jobid, starting from fromEventId. 
+   * Returns empty aray if no events are available. 
+   * @param jobid job id 
+   * @param fromEventId event id to start from. 
+   * @return array of task completion events. 
+   * @throws IOException
+   */
+  TaskCompletionEvent[] getTaskCompletionEvents(
+      String jobid, int fromEventId) throws IOException; 
+  
 }
 
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Jan 16 13:28:50 2007
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.commons.cli2.validation.InvalidArgumentException;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
@@ -39,6 +38,8 @@
  *******************************************************/
 public class JobClient extends ToolBase implements MRConstants  {
     private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
+    public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL };
+    private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
 
     static long MAX_JOBPROFILE_AGE = 1000 * 2;
 
@@ -150,6 +151,14 @@
         public synchronized void killJob() throws IOException {
             jobSubmitClient.killJob(getJobID());
         }
+        /**
+         * Fetch task completion events from jobtracker for this job. 
+         */
+        public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+            int startFrom) throws IOException{
+          return jobSubmitClient.getTaskCompletionEvents(
+              getJobID(), startFrom); 
+        }
 
         /**
          * Dump stats to screen
@@ -367,10 +376,22 @@
       String lastReport = null;
       final int MAX_RETRIES = 5;
       int retries = MAX_RETRIES;
+      String outputFilterName = job.get("jobclient.output.filter");
+
+      if (null != outputFilterName) {
+        try {
+          jc.setTaskOutputFilter(TaskStatusFilter.valueOf(outputFilterName));
+        } catch(IllegalArgumentException e) {
+          LOG.warn("Invalid Output filter : " + outputFilterName + 
+              " Valid values are : NONE, FAILED, SUCCEEDED, ALL"); 
+        }
+      }
       try {
         running = jc.submitJob(job);
         String jobId = running.getJobID();
         LOG.info("Running job: " + jobId);
+        int eventCounter = 0 ; 
+        
         while (true) {
           try {
             Thread.sleep(1000);
@@ -388,6 +409,34 @@
               LOG.info(report);
               lastReport = report;
             }
+            
+            if( jc.getTaskOutputFilter()  != TaskStatusFilter.NONE){
+              TaskCompletionEvent[] events = 
+                running.getTaskCompletionEvents(eventCounter); 
+              eventCounter += events.length ;
+              for(TaskCompletionEvent event : events ){
+                switch( jc.getTaskOutputFilter() ){
+                case SUCCEEDED:
+                  if( event.getTaskStatus() == 
+                    TaskCompletionEvent.Status.SUCCEEDED){
+                    LOG.info(event.toString());
+                    printHttpFile(event.getTaskTrackerHttp());
+                  }
+                  break; 
+                case FAILED:
+                  if( event.getTaskStatus() == 
+                    TaskCompletionEvent.Status.FAILED){
+                    LOG.info(event.toString());
+                    printHttpFile(event.getTaskTrackerHttp());
+                  }
+                  break ; 
+                case ALL:
+                  LOG.info(event.toString());
+                  printHttpFile(event.getTaskTrackerHttp());
+                  break;
+                }
+              }
+            }
             retries = MAX_RETRIES;
           } catch (IOException ie) {
             if (--retries == 0) {
@@ -410,6 +459,35 @@
         jc.close();
       }
     }
+    
+    static void printHttpFile(String httpURL ) throws IOException {
+      boolean good = false;
+      long totalBytes = 0;
+      URL path = new URL(httpURL); 
+      try {
+        URLConnection connection = path.openConnection();
+        InputStream input = connection.getInputStream();
+        try {
+          byte[] buffer = new byte[64 * 1024];
+          int len = input.read(buffer);
+          while (len > 0) {
+            totalBytes += len;
+            LOG.info(new String(buffer).trim());
+            len = input.read(buffer);
+          }
+          good = ((int) totalBytes) == connection.getContentLength();
+          if (!good) {
+          LOG.warn("Incomplete task output received for " + path +
+                          " (" + totalBytes + " instead of " + 
+                          connection.getContentLength() + ")");
+          }
+        }finally {
+            input.close();
+        }
+      }catch(IOException ioe){
+        LOG.warn("Error reading task output" + ioe.getMessage()); 
+      }
+    }    
 
     static Configuration getConfiguration(String jobTrackerSpec)
     {
@@ -428,8 +506,23 @@
       }
       return conf;
     }
-        
 
+    /**
+     * Sets the output filter for tasks. only those tasks are printed whose
+     * output matches the filter. 
+     * @param newValue task filter.
+     */
+    public void setTaskOutputFilter(TaskStatusFilter newValue){
+      this.taskOutputFilter = newValue ;
+    }
+    /**
+     * Returns task output filter.
+     * @return task filter. 
+     */
+    public TaskStatusFilter getTaskOutputFilter(){
+      return this.taskOutputFilter; 
+    }
+    
     public int run(String[] argv) throws Exception {
         if (argv.length < 2) {
             System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jan 16 13:28:50 2007
@@ -53,6 +53,8 @@
     int failedReduceTasks = 0 ; 
     JobTracker jobtracker = null;
     HashMap hostToMaps = new HashMap();
+    private int taskCompletionEventTracker = 0 ; 
+    List<TaskCompletionEvent> taskCompletionEvents ; 
 
     long startTime;
     long finishTime;
@@ -95,6 +97,8 @@
 
         this.numMapTasks = conf.getNumMapTasks();
         this.numReduceTasks = conf.getNumReduceTasks();
+        this.taskCompletionEvents = new ArrayList(
+            numMapTasks + numReduceTasks + 10);
         JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), 
             System.currentTimeMillis(), jobFile); 
         
@@ -276,10 +280,29 @@
         boolean change = tip.updateStatus(status);
         if (change) {
           TaskStatus.State state = status.getRunState();
+          TaskTrackerStatus ttStatus = 
+            this.jobtracker.getTaskTracker(status.getTaskTracker());
+          String httpTaskLogLocation = null; 
+          if( null != ttStatus ){
+            httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" + 
+              ttStatus.getHttpPort() + "/tasklog.jsp?plaintext=true&taskid=" +
+              status.getTaskId() + "&all=true";
+          }
+          
           if (state == TaskStatus.State.SUCCEEDED) {
+            this.taskCompletionEvents.add( new TaskCompletionEvent(
+                taskCompletionEventTracker++, 
+                status.getTaskId(), 
+                TaskCompletionEvent.Status.SUCCEEDED,
+                httpTaskLogLocation ));
             completedTask(tip, status, metrics);
           } else if (state == TaskStatus.State.FAILED ||
                      state == TaskStatus.State.KILLED) {
+            this.taskCompletionEvents.add( new TaskCompletionEvent(
+                taskCompletionEventTracker++, 
+                status.getTaskId(), 
+                TaskCompletionEvent.Status.FAILED, 
+                httpTaskLogLocation ));
             // Tell the job to fail the relevant task
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
                        wasRunning, wasComplete);
@@ -753,5 +776,15 @@
          }
        }
        return null;
+    }
+    
+    public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) {
+      TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+      if( taskCompletionEvents.size() > fromEventId) {
+        events = (TaskCompletionEvent[])taskCompletionEvents.subList(
+            fromEventId, taskCompletionEvents.size()).
+            toArray(events);        
+      }
+      return events; 
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Tue Jan 16 13:28:50 2007
@@ -80,4 +80,16 @@
      * jobs.
      */
     public JobStatus[] jobsToComplete() throws IOException;
+    
+    /**
+     * Get task completion events for the jobid, starting from fromEventId. 
+     * Returns empty aray if no events are available. 
+     * @param jobid job id 
+     * @param fromEventId event id to start from. 
+     * @return array of task completion events. 
+     * @throws IOException
+     */
+    public TaskCompletionEvent[] getTaskCompletionEvents(
+                String jobid, int fromEventId) throws IOException;
+    
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Jan 16 13:28:50 2007
@@ -1431,6 +1431,21 @@
             return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);
         }
     }
+    
+    /* 
+     * Returns a list of TaskCompletionEvent for the given job, 
+     * starting from fromEventId.
+     * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int)
+     */
+    public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+        String jobid, int fromEventId) throws IOException{
+      TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+      JobInProgress job = (JobInProgress)this.jobs.get(jobid);
+      if (null != job) {
+        events = job.getTaskCompletionEvents(fromEventId);
+      }
+      return events;
+    }
 
     /**
      * Get the diagnostics for a given task

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jan 16 13:28:50 2007
@@ -243,4 +243,9 @@
   }
 
   public JobStatus[] jobsToComplete() {return null;}
+  public TaskCompletionEvent[] getTaskCompletionEvents(
+      String jobid, int fromEventId) throws IOException{
+    return TaskCompletionEvent.EMPTY_ARRAY;
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Tue Jan 16 13:28:50 2007
@@ -76,4 +76,8 @@
      * killed as well.  If the job is no longer running, it simply returns.
      */
     public void killJob() throws IOException;
+    
+    public TaskCompletionEvent[] getTaskCompletionEvents(
+        int startFrom) throws IOException;
+    
 }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=auto&rev=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Jan 16 13:28:50 2007
@@ -0,0 +1,131 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * This is used to track task completion events on 
+ * job tracker. 
+ *
+ */
+public class TaskCompletionEvent implements Writable{
+    static public enum Status {FAILED, SUCCEEDED};
+    
+    private int eventId ; 
+    private String taskTrackerHttp ;
+    private String taskId ;
+    Status status ; 
+    public static final TaskCompletionEvent[] EMPTY_ARRAY = 
+        new TaskCompletionEvent[0];
+    /**
+     * Default constructor for Writable.
+     *
+     */
+    public TaskCompletionEvent(){}
+    /**
+     * Constructor. eventId should be created externally and incremented
+     * per event for each job. 
+     * @param eventId event id, event id should be unique and assigned in
+     *  incrementally, starting from 0. 
+     * @param taskId task id
+     * @param status task's status 
+     * @param taskTrackerHttp task tracker's host:port for http. 
+     */
+    public TaskCompletionEvent(int eventId, 
+        String taskId, 
+        Status status, 
+        String taskTrackerHttp){
+      
+        this.taskId = taskId ; 
+        this.eventId = eventId ; 
+        this.status =status ; 
+        this.taskTrackerHttp = taskTrackerHttp ;
+    }
+    /**
+     * Returns event Id. 
+     * @return event id
+     */
+    public int getEventId() {
+        return eventId;
+    }
+    /**
+     * Returns task id. 
+     * @return task id
+     */
+    public String getTaskId() {
+        return taskId;
+    }
+    /**
+     * Returns enum Status.SUCESS or Status.FAILURE.
+     * @return task tracker status
+     */
+    public Status getTaskStatus() {
+        return status;
+    }
+    /**
+     * http location of the tasktracker where this task ran. 
+     * @return http location of tasktracker user logs
+     */
+    public String getTaskTrackerHttp() {
+        return taskTrackerHttp;
+    }
+    /**
+     * set event Id. should be assigned incrementally starting from 0. 
+     * @param eventId
+     */
+    public void setEventId(
+        int eventId) {
+        this.eventId = eventId;
+    }
+    /**
+     * Sets task id. 
+     * @param taskId
+     */
+    public void setTaskId(
+        String taskId) {
+        this.taskId = taskId;
+    }
+    /**
+     * Set task status. 
+     * @param status
+     */
+    public void setTaskStatus(
+        Status status) {
+        this.status = status;
+    }
+    /**
+     * Set task tracker http location. 
+     * @param taskTrackerHttp
+     */
+    public void setTaskTrackerHttp(
+        String taskTrackerHttp) {
+        this.taskTrackerHttp = taskTrackerHttp;
+    }
+    
+    public String toString(){
+        StringBuffer buf = new StringBuffer(); 
+        buf.append("Task Id : "); 
+        buf.append( taskId ) ; 
+        buf.append(", Status : ");  
+        buf.append( status.name() ) ;
+        return buf.toString();
+    }
+    
+    //////////////////////////////////////////////
+    // Writable
+    //////////////////////////////////////////////
+    public void write(DataOutput out) throws IOException {
+        WritableUtils.writeString(out, taskId); 
+        WritableUtils.writeEnum(out, status); 
+        WritableUtils.writeString(out, taskTrackerHttp);
+    }
+  
+    public void readFields(DataInput in) throws IOException {
+        this.taskId = WritableUtils.readString(in) ; 
+        this.status = WritableUtils.readEnum(in, Status.class);
+        this.taskTrackerHttp = WritableUtils.readString(in);
+    }
+}

Modified: lucene/hadoop/trunk/src/webapps/task/tasklog.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasklog.jsp?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/tasklog.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Tue Jan 16 13:28:50 2007
@@ -10,6 +10,7 @@
   long tailSize = 1024;
   int tailWindow = 1;
   boolean entireLog = false;
+  boolean plainText = false; 
   
   String taskId = request.getParameter("taskid");
   if (taskId == null) {
@@ -46,6 +47,11 @@
   if (sTailWindow != null) {
   	tailWindow = Integer.valueOf(sTailWindow).intValue();
   }
+  
+  String sPlainText = request.getParameter("plaintext");
+  if (sPlainText != null) {
+    plainText = Boolean.valueOf(sPlainText);
+  }
 
   if (logOffset == -1 || logLength == -1) {
   	tailLog = true;
@@ -55,17 +61,18 @@
   if (entireLog) {
     tailLog = false;
   }
-%>
-
-<html>
-
-<title><%= taskId %> Task Logs</title>
+  
+  if( !plainText ) {
+    out.println("<html>");
+    out.println("<title>" + taskId + "Task Logs </title>"); 
+    out.println("<body>");
+    out.println("<h1>" +  taskId + "Task Logs</h1><br>"); 
+    out.println("<h2>Task Logs</h2>");
+    out.println("<pre>");
 
-<body>
-<h1><%= taskId %> Task Logs</h1><br>
+  }
+%>
 
-<h2>Task Logs</h2>
-<pre>
 <%
   boolean gotRequiredData = true;
   try {
@@ -91,20 +98,30 @@
   	  
   	if (bytesRead != targetLength && 
   	  targetLength <= taskLogReader.getTotalLogSize()) {
-  	  out.println("<b>Warning: Could not fetch " + targetLength + 
-  		  " bytes from the task-logs; probably purged!</b><br/>");
+  	  if( !plainText) {
+	  	  out.println("<b>Warning: Could not fetch " + targetLength + 
+	  		  " bytes from the task-logs; probably purged!</b><br/>");
+  	  }else{
+	  	  out.println("Warning: Could not fetch " + targetLength + 
+  		  " bytes from the task-logs; probably purged!");
+  	  }
   	  gotRequiredData = false;
   	}
+  	if( plainText ) {
+  	  response.setContentLength(bytesRead); 
+  	}
 	String logData = new String(b, 0, bytesRead);
 	out.println(logData);
   } catch (IOException ioe) {
   	out.println("Failed to retrieve logs for task: " + taskId);
   }
+  
+  if( !plainText ) {
+    out.println("</pre>");
+  }
 %>
-</pre>
-
 <%
-  if (!entireLog) {
+  if (!entireLog && !plainText) {
     if (tailLog) {
       if (gotRequiredData) {
   	  	out.println("<a href='/tasklog.jsp?taskid=" + taskId + 
@@ -127,9 +144,10 @@
   		  "&len=" + logLength + "'>Later</a>");
     }
   }
+  if( !plainText ) {
+    out.println("<hr>");
+    out.println("<a href='http://lucene.apache.org/hadoop'>Hadoop</a>, 2006.<br>");
+    out.println("</body>");
+    out.println("</html>");
+  }
 %>
-
-<hr>
-<a href="http://lucene.apache.org/hadoop">Hadoop</a>, 2006.<br>
-</body>
-</html>