You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/16 22:28:51 UTC
svn commit: r496864 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/ src/webapps/task/
Author: cutting
Date: Tue Jan 16 13:28:50 2007
New Revision: 496864
URL: http://svn.apache.org/viewvc?view=rev&rev=496864
Log:
HADOOP-801. Add to jobtracker a log of task completion events. Contributed by Sanjay.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
lucene/hadoop/trunk/src/webapps/task/tasklog.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 13:28:50 2007
@@ -25,6 +25,9 @@
for better compatibility with some monitoring systems.
(Nigel Daley via cutting)
+ 7. HADOOP-801. Add to jobtracker a log of task completion events.
+ (Sanjay Dahiya via cutting)
+
Release 0.10.1 - 2007-01-10
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Jan 16 13:28:50 2007
@@ -91,6 +91,17 @@
public void reportTaskTrackerError(String taskTracker,
String errorClass,
String errorMessage) throws IOException;
+ /**
+ * Get task completion events for the jobid, starting from fromEventId.
+ * Returns empty aray if no events are available.
+ * @param jobid job id
+ * @param fromEventId event id to start from.
+ * @return array of task completion events.
+ * @throws IOException
+ */
+ TaskCompletionEvent[] getTaskCompletionEvents(
+ String jobid, int fromEventId) throws IOException;
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Jan 16 13:28:50 2007
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.commons.cli2.validation.InvalidArgumentException;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
@@ -39,6 +38,8 @@
*******************************************************/
public class JobClient extends ToolBase implements MRConstants {
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
+ public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL };
+ private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
static long MAX_JOBPROFILE_AGE = 1000 * 2;
@@ -150,6 +151,14 @@
public synchronized void killJob() throws IOException {
jobSubmitClient.killJob(getJobID());
}
+ /**
+ * Fetch task completion events from jobtracker for this job.
+ */
+ public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+ int startFrom) throws IOException{
+ return jobSubmitClient.getTaskCompletionEvents(
+ getJobID(), startFrom);
+ }
/**
* Dump stats to screen
@@ -367,10 +376,22 @@
String lastReport = null;
final int MAX_RETRIES = 5;
int retries = MAX_RETRIES;
+ String outputFilterName = job.get("jobclient.output.filter");
+
+ if (null != outputFilterName) {
+ try {
+ jc.setTaskOutputFilter(TaskStatusFilter.valueOf(outputFilterName));
+ } catch(IllegalArgumentException e) {
+ LOG.warn("Invalid Output filter : " + outputFilterName +
+ " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
+ }
+ }
try {
running = jc.submitJob(job);
String jobId = running.getJobID();
LOG.info("Running job: " + jobId);
+ int eventCounter = 0 ;
+
while (true) {
try {
Thread.sleep(1000);
@@ -388,6 +409,34 @@
LOG.info(report);
lastReport = report;
}
+
+ if( jc.getTaskOutputFilter() != TaskStatusFilter.NONE){
+ TaskCompletionEvent[] events =
+ running.getTaskCompletionEvents(eventCounter);
+ eventCounter += events.length ;
+ for(TaskCompletionEvent event : events ){
+ switch( jc.getTaskOutputFilter() ){
+ case SUCCEEDED:
+ if( event.getTaskStatus() ==
+ TaskCompletionEvent.Status.SUCCEEDED){
+ LOG.info(event.toString());
+ printHttpFile(event.getTaskTrackerHttp());
+ }
+ break;
+ case FAILED:
+ if( event.getTaskStatus() ==
+ TaskCompletionEvent.Status.FAILED){
+ LOG.info(event.toString());
+ printHttpFile(event.getTaskTrackerHttp());
+ }
+ break ;
+ case ALL:
+ LOG.info(event.toString());
+ printHttpFile(event.getTaskTrackerHttp());
+ break;
+ }
+ }
+ }
retries = MAX_RETRIES;
} catch (IOException ie) {
if (--retries == 0) {
@@ -410,6 +459,35 @@
jc.close();
}
}
+
+ static void printHttpFile(String httpURL ) throws IOException {
+ boolean good = false;
+ long totalBytes = 0;
+ URL path = new URL(httpURL);
+ try {
+ URLConnection connection = path.openConnection();
+ InputStream input = connection.getInputStream();
+ try {
+ byte[] buffer = new byte[64 * 1024];
+ int len = input.read(buffer);
+ while (len > 0) {
+ totalBytes += len;
+ LOG.info(new String(buffer).trim());
+ len = input.read(buffer);
+ }
+ good = ((int) totalBytes) == connection.getContentLength();
+ if (!good) {
+ LOG.warn("Incomplete task output received for " + path +
+ " (" + totalBytes + " instead of " +
+ connection.getContentLength() + ")");
+ }
+ }finally {
+ input.close();
+ }
+ }catch(IOException ioe){
+ LOG.warn("Error reading task output" + ioe.getMessage());
+ }
+ }
static Configuration getConfiguration(String jobTrackerSpec)
{
@@ -428,8 +506,23 @@
}
return conf;
}
-
+ /**
+ * Sets the output filter for tasks. only those tasks are printed whose
+ * output matches the filter.
+ * @param newValue task filter.
+ */
+ public void setTaskOutputFilter(TaskStatusFilter newValue){
+ this.taskOutputFilter = newValue ;
+ }
+ /**
+ * Returns task output filter.
+ * @return task filter.
+ */
+ public TaskStatusFilter getTaskOutputFilter(){
+ return this.taskOutputFilter;
+ }
+
public int run(String[] argv) throws Exception {
if (argv.length < 2) {
System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]");
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jan 16 13:28:50 2007
@@ -53,6 +53,8 @@
int failedReduceTasks = 0 ;
JobTracker jobtracker = null;
HashMap hostToMaps = new HashMap();
+ private int taskCompletionEventTracker = 0 ;
+ List<TaskCompletionEvent> taskCompletionEvents ;
long startTime;
long finishTime;
@@ -95,6 +97,8 @@
this.numMapTasks = conf.getNumMapTasks();
this.numReduceTasks = conf.getNumReduceTasks();
+ this.taskCompletionEvents = new ArrayList(
+ numMapTasks + numReduceTasks + 10);
JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(),
System.currentTimeMillis(), jobFile);
@@ -276,10 +280,29 @@
boolean change = tip.updateStatus(status);
if (change) {
TaskStatus.State state = status.getRunState();
+ TaskTrackerStatus ttStatus =
+ this.jobtracker.getTaskTracker(status.getTaskTracker());
+ String httpTaskLogLocation = null;
+ if( null != ttStatus ){
+ httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" +
+ ttStatus.getHttpPort() + "/tasklog.jsp?plaintext=true&taskid=" +
+ status.getTaskId() + "&all=true";
+ }
+
if (state == TaskStatus.State.SUCCEEDED) {
+ this.taskCompletionEvents.add( new TaskCompletionEvent(
+ taskCompletionEventTracker++,
+ status.getTaskId(),
+ TaskCompletionEvent.Status.SUCCEEDED,
+ httpTaskLogLocation ));
completedTask(tip, status, metrics);
} else if (state == TaskStatus.State.FAILED ||
state == TaskStatus.State.KILLED) {
+ this.taskCompletionEvents.add( new TaskCompletionEvent(
+ taskCompletionEventTracker++,
+ status.getTaskId(),
+ TaskCompletionEvent.Status.FAILED,
+ httpTaskLogLocation ));
// Tell the job to fail the relevant task
failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
wasRunning, wasComplete);
@@ -753,5 +776,15 @@
}
}
return null;
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) {
+ TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+ if( taskCompletionEvents.size() > fromEventId) {
+ events = (TaskCompletionEvent[])taskCompletionEvents.subList(
+ fromEventId, taskCompletionEvents.size()).
+ toArray(events);
+ }
+ return events;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Tue Jan 16 13:28:50 2007
@@ -80,4 +80,16 @@
* jobs.
*/
public JobStatus[] jobsToComplete() throws IOException;
+
+ /**
+ * Get task completion events for the jobid, starting from fromEventId.
+ * Returns empty aray if no events are available.
+ * @param jobid job id
+ * @param fromEventId event id to start from.
+ * @return array of task completion events.
+ * @throws IOException
+ */
+ public TaskCompletionEvent[] getTaskCompletionEvents(
+ String jobid, int fromEventId) throws IOException;
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Jan 16 13:28:50 2007
@@ -1431,6 +1431,21 @@
return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);
}
}
+
+ /*
+ * Returns a list of TaskCompletionEvent for the given job,
+ * starting from fromEventId.
+ * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int)
+ */
+ public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+ String jobid, int fromEventId) throws IOException{
+ TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+ JobInProgress job = (JobInProgress)this.jobs.get(jobid);
+ if (null != job) {
+ events = job.getTaskCompletionEvents(fromEventId);
+ }
+ return events;
+ }
/**
* Get the diagnostics for a given task
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jan 16 13:28:50 2007
@@ -243,4 +243,9 @@
}
public JobStatus[] jobsToComplete() {return null;}
+ public TaskCompletionEvent[] getTaskCompletionEvents(
+ String jobid, int fromEventId) throws IOException{
+ return TaskCompletionEvent.EMPTY_ARRAY;
+ }
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Tue Jan 16 13:28:50 2007
@@ -76,4 +76,8 @@
* killed as well. If the job is no longer running, it simply returns.
*/
public void killJob() throws IOException;
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(
+ int startFrom) throws IOException;
+
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=auto&rev=496864
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Jan 16 13:28:50 2007
@@ -0,0 +1,131 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * This is used to track task completion events on
+ * job tracker.
+ *
+ */
+public class TaskCompletionEvent implements Writable{
+ static public enum Status {FAILED, SUCCEEDED};
+
+ private int eventId ;
+ private String taskTrackerHttp ;
+ private String taskId ;
+ Status status ;
+ public static final TaskCompletionEvent[] EMPTY_ARRAY =
+ new TaskCompletionEvent[0];
+ /**
+ * Default constructor for Writable.
+ *
+ */
+ public TaskCompletionEvent(){}
+ /**
+ * Constructor. eventId should be created externally and incremented
+ * per event for each job.
+ * @param eventId event id, event id should be unique and assigned in
+ * incrementally, starting from 0.
+ * @param taskId task id
+ * @param status task's status
+ * @param taskTrackerHttp task tracker's host:port for http.
+ */
+ public TaskCompletionEvent(int eventId,
+ String taskId,
+ Status status,
+ String taskTrackerHttp){
+
+ this.taskId = taskId ;
+ this.eventId = eventId ;
+ this.status =status ;
+ this.taskTrackerHttp = taskTrackerHttp ;
+ }
+ /**
+ * Returns event Id.
+ * @return event id
+ */
+ public int getEventId() {
+ return eventId;
+ }
+ /**
+ * Returns task id.
+ * @return task id
+ */
+ public String getTaskId() {
+ return taskId;
+ }
+ /**
+ * Returns enum Status.SUCESS or Status.FAILURE.
+ * @return task tracker status
+ */
+ public Status getTaskStatus() {
+ return status;
+ }
+ /**
+ * http location of the tasktracker where this task ran.
+ * @return http location of tasktracker user logs
+ */
+ public String getTaskTrackerHttp() {
+ return taskTrackerHttp;
+ }
+ /**
+ * set event Id. should be assigned incrementally starting from 0.
+ * @param eventId
+ */
+ public void setEventId(
+ int eventId) {
+ this.eventId = eventId;
+ }
+ /**
+ * Sets task id.
+ * @param taskId
+ */
+ public void setTaskId(
+ String taskId) {
+ this.taskId = taskId;
+ }
+ /**
+ * Set task status.
+ * @param status
+ */
+ public void setTaskStatus(
+ Status status) {
+ this.status = status;
+ }
+ /**
+ * Set task tracker http location.
+ * @param taskTrackerHttp
+ */
+ public void setTaskTrackerHttp(
+ String taskTrackerHttp) {
+ this.taskTrackerHttp = taskTrackerHttp;
+ }
+
+ public String toString(){
+ StringBuffer buf = new StringBuffer();
+ buf.append("Task Id : ");
+ buf.append( taskId ) ;
+ buf.append(", Status : ");
+ buf.append( status.name() ) ;
+ return buf.toString();
+ }
+
+ //////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeString(out, taskId);
+ WritableUtils.writeEnum(out, status);
+ WritableUtils.writeString(out, taskTrackerHttp);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.taskId = WritableUtils.readString(in) ;
+ this.status = WritableUtils.readEnum(in, Status.class);
+ this.taskTrackerHttp = WritableUtils.readString(in);
+ }
+}
Modified: lucene/hadoop/trunk/src/webapps/task/tasklog.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasklog.jsp?view=diff&rev=496864&r1=496863&r2=496864
==============================================================================
--- lucene/hadoop/trunk/src/webapps/task/tasklog.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Tue Jan 16 13:28:50 2007
@@ -10,6 +10,7 @@
long tailSize = 1024;
int tailWindow = 1;
boolean entireLog = false;
+ boolean plainText = false;
String taskId = request.getParameter("taskid");
if (taskId == null) {
@@ -46,6 +47,11 @@
if (sTailWindow != null) {
tailWindow = Integer.valueOf(sTailWindow).intValue();
}
+
+ String sPlainText = request.getParameter("plaintext");
+ if (sPlainText != null) {
+ plainText = Boolean.valueOf(sPlainText);
+ }
if (logOffset == -1 || logLength == -1) {
tailLog = true;
@@ -55,17 +61,18 @@
if (entireLog) {
tailLog = false;
}
-%>
-
-<html>
-
-<title><%= taskId %> Task Logs</title>
+
+ if( !plainText ) {
+ out.println("<html>");
+ out.println("<title>" + taskId + "Task Logs </title>");
+ out.println("<body>");
+ out.println("<h1>" + taskId + "Task Logs</h1><br>");
+ out.println("<h2>Task Logs</h2>");
+ out.println("<pre>");
-<body>
-<h1><%= taskId %> Task Logs</h1><br>
+ }
+%>
-<h2>Task Logs</h2>
-<pre>
<%
boolean gotRequiredData = true;
try {
@@ -91,20 +98,30 @@
if (bytesRead != targetLength &&
targetLength <= taskLogReader.getTotalLogSize()) {
- out.println("<b>Warning: Could not fetch " + targetLength +
- " bytes from the task-logs; probably purged!</b><br/>");
+ if( !plainText) {
+ out.println("<b>Warning: Could not fetch " + targetLength +
+ " bytes from the task-logs; probably purged!</b><br/>");
+ }else{
+ out.println("Warning: Could not fetch " + targetLength +
+ " bytes from the task-logs; probably purged!");
+ }
gotRequiredData = false;
}
+ if( plainText ) {
+ response.setContentLength(bytesRead);
+ }
String logData = new String(b, 0, bytesRead);
out.println(logData);
} catch (IOException ioe) {
out.println("Failed to retrieve logs for task: " + taskId);
}
+
+ if( !plainText ) {
+ out.println("</pre>");
+ }
%>
-</pre>
-
<%
- if (!entireLog) {
+ if (!entireLog && !plainText) {
if (tailLog) {
if (gotRequiredData) {
out.println("<a href='/tasklog.jsp?taskid=" + taskId +
@@ -127,9 +144,10 @@
"&len=" + logLength + "'>Later</a>");
}
}
+ if( !plainText ) {
+ out.println("<hr>");
+ out.println("<a href='http://lucene.apache.org/hadoop'>Hadoop</a>, 2006.<br>");
+ out.println("</body>");
+ out.println("</html>");
+ }
%>
-
-<hr>
-<a href="http://lucene.apache.org/hadoop">Hadoop</a>, 2006.<br>
-</body>
-</html>