You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/26 00:29:23 UTC
svn commit: r449840 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/util/ src/webapps/job/
Author: cutting
Date: Mon Sep 25 15:29:21 2006
New Revision: 449840
URL: http://svn.apache.org/viewvc?view=rev&rev=449840
Log:
HADOOP-263. Include timestamps for job transitions. Contributed by Sanjay.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Sep 25 15:29:21 2006
@@ -55,6 +55,11 @@
new command, 'dfs -rmr' which operates recursively.
(Sameer Paranjpye via cutting)
+15. HADOOP-263. Include timestamps for job transitions. The web
+ interface now displays the start and end times of tasks and the
+ start times of sorting and reducing for reduce tasks. Also,
+ extend ObjectWritable to handle enums, so that they can be passed
+ as RPC parameters. (Sanjay Dahiya via cutting)
Release 0.6.2 (unreleased)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java Mon Sep 25 15:29:21 2006
@@ -359,5 +359,27 @@
return len+1;
}
-
+ /**
+ * Read an Enum value from DataInput, Enums are read and written
+ * using String values.
+ * @param <T> Enum type
+ * @param in DataInput to read from
+ * @param enumType Class type of Enum
+ * @return Enum represented by String read from DataInput
+ * @throws IOException
+ */
+ public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType)
+ throws IOException{
+ return T.valueOf(enumType, Text.readString(in));
+ }
+ /**
+ * writes String value of enum to DataOutput.
+ * @param out Dataoutput stream
+ * @param enumVal enum value
+ * @throws IOException
+ */
+ public static void writeEnum(DataOutput out, Enum enumVal)
+ throws IOException{
+ Text.writeString(out, enumVal.name());
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Mon Sep 25 15:29:21 2006
@@ -52,8 +52,8 @@
return true;
}
- public void progress(String taskid, float progress, String state
- ) throws IOException {
+ public void progress(String taskid, float progress, String state,
+ Phase phase) throws IOException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskid);
buf.append(" making progress to ");
@@ -63,6 +63,7 @@
buf.append(state);
}
LOG.info(buf.toString());
+ // ignore phase
}
public void reportDiagnosticInfo(String taskid, String trace) throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Sep 25 15:29:21 2006
@@ -591,8 +591,8 @@
* @param trackerName The task tracker the task failed on
*/
public void failedTask(TaskInProgress tip, String taskid,
- String reason, String hostname,
- String trackerName,
+ String reason, Phase phase,
+ String hostname, String trackerName,
JobTrackerMetrics metrics) {
TaskStatus status = new TaskStatus(taskid,
tip.isMapTask(),
@@ -600,7 +600,7 @@
TaskStatus.FAILED,
reason,
reason,
- trackerName);
+ trackerName, phase);
updateTaskStatus(tip, status, metrics);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Sep 25 15:29:21 2006
@@ -138,6 +138,7 @@
TaskTrackerStatus trackerStatus =
getTaskTracker(trackerName);
job.failedTask(tip, taskId, "Error launching task",
+ tip.isMapTask()?Phase.MAP:Phase.STARTING,
trackerStatus.getHost(), trackerName,
myMetrics);
}
@@ -1199,7 +1200,7 @@
// if the job is done, we don't want to change anything
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
job.failedTask(tip, taskId, "Lost task tracker",
- hostname, trackerName, myMetrics);
+ Phase.MAP, hostname, trackerName, myMetrics);
}
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Sep 25 15:29:21 2006
@@ -157,7 +157,7 @@
public Task getTask(String taskid) { return null; }
- public void progress(String taskId, float progress, String state) {
+ public void progress(String taskId, float progress, String state, Phase phase) {
LOG.info(state);
float taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
@@ -166,6 +166,8 @@
} else {
status.setReduceProgress(progress);
}
+
+ // ignore phase
}
public void reportDiagnosticInfo(String taskid, String trace) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Sep 25 15:29:21 2006
@@ -45,6 +45,9 @@
});
}
+ { // set phase for this task
+ setPhase(Phase.MAP);
+ }
private class MapTaskMetrics {
private MetricsRecord metricsRecord = null;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Sep 25 15:29:21 2006
@@ -65,7 +65,10 @@
private int numMaps;
private boolean sortComplete;
- { getProgress().setStatus("reduce"); }
+ {
+ getProgress().setStatus("reduce");
+ setPhase(Phase.SHUFFLE); // phase to start with
+ }
private Progress copyPhase = getProgress().addPhase("copy");
private Progress sortPhase = getProgress().addPhase("sort");
@@ -200,6 +203,7 @@
FileSystem lfs = FileSystem.getNamed("local", job);
copyPhase.complete(); // copy is already complete
+
// open a file to collect map output
Path[] mapFiles = new Path[numMaps];
@@ -232,6 +236,7 @@
WritableComparator comparator = job.getOutputKeyComparator();
try {
+ setPhase(Phase.SORT) ;
sortProgress.start();
// sort the input file
@@ -244,6 +249,7 @@
}
sortPhase.complete(); // sort is complete
+ setPhase(Phase.REDUCE);
Reporter reporter = getReporter(umbilical, getProgress());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Sep 25 15:29:21 2006
@@ -37,7 +37,8 @@
private String taskId; // unique, includes job id
private String jobId; // unique jobid
private int partition; // id within job
-
+ private Phase phase ; // current phase of the task
+
////////////////////////////////////////////
// Constructors
////////////////////////////////////////////
@@ -73,6 +74,20 @@
public int getPartition() {
return partition;
}
+ /**
+ * Return current phase of the task.
+ * @return
+ */
+ public Phase getPhase(){
+ return this.phase ;
+ }
+ /**
+ * Set current phase of the task.
+ * @param p
+ */
+ protected void setPhase(Phase p){
+ this.phase = p ;
+ }
////////////////////////////////////////////
// Writable methods
@@ -153,7 +168,7 @@
float progress = taskProgress.get();
String status = taskProgress.toString();
try {
- umbilical.progress(getTaskId(), progress, status);
+ umbilical.progress(getTaskId(), progress, status, phase);
} catch (IOException ie) {
LOG.warn(StringUtils.stringifyException(ie));
}
@@ -164,7 +179,7 @@
public void done(TaskUmbilicalProtocol umbilical)
throws IOException {
umbilical.progress(getTaskId(), // send a final status report
- taskProgress.get(), taskProgress.toString());
+ taskProgress.get(), taskProgress.toString(), phase);
umbilical.done(getTaskId());
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Sep 25 15:29:21 2006
@@ -65,6 +65,8 @@
private double progress = 0;
private String state = "";
private long startTime = 0;
+ private long execStartTime = 0 ;
+ private long execFinishTime = 0 ;
private int completes = 0;
private boolean failed = false;
private TreeSet usableTaskIds = new TreeSet();
@@ -237,9 +239,12 @@
for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) {
diagnostics.addAll((List)i.next());
}
- return new TaskReport
- (getTIPId(), (float)progress, state,
- (String[])diagnostics.toArray(new String[diagnostics.size()]));
+ TaskReport report = new TaskReport
+ (getTIPId(), (float)progress, state,
+ (String[])diagnostics.toArray(new String[diagnostics.size()]),
+ execStartTime, execFinishTime);
+
+ return report ;
}
/**
@@ -311,6 +316,10 @@
TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
if (status != null) {
status.setRunState(TaskStatus.FAILED);
+ // tasktracker went down and failed time was not reported.
+ if( 0 == status.getFinishTime() ){
+ status.setFinishTime(System.currentTimeMillis());
+ }
}
this.recentTasks.remove(taskid);
if (this.completes > 0) {
@@ -372,8 +381,10 @@
void recomputeProgress() {
if (isComplete()) {
this.progress = 1;
+ this.execFinishTime = System.currentTimeMillis();
} else if (failed) {
this.progress = 0;
+ this.execFinishTime = System.currentTimeMillis();
} else {
double bestProgress = 0;
String bestState = "";
@@ -434,6 +445,10 @@
*/
public Task getTaskToRun(String taskTracker) {
Task t = null;
+ if( 0 == execStartTime ){
+ // assume task starts running now
+ execStartTime = System.currentTimeMillis();
+ }
String taskid = (String) usableTaskIds.first();
usableTaskIds.remove(taskid);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java Mon Sep 25 15:29:21 2006
@@ -25,15 +25,19 @@
private float progress;
private String state;
private String[] diagnostics;
+ private long startTime ;
+ private long finishTime;
public TaskReport() {}
TaskReport(String taskid, float progress, String state,
- String[] diagnostics) {
+ String[] diagnostics, long startTime, long finishTime) {
this.taskid = taskid;
this.progress = progress;
this.state = state;
this.diagnostics = diagnostics;
+ this.startTime = startTime ;
+ this.finishTime = finishTime ;
}
/** The id of the task. */
@@ -44,7 +48,36 @@
public String getState() { return state; }
/** A list of error messages. */
public String[] getDiagnostics() { return diagnostics; }
+ /**
+ * Get finish time of task.
+ * @return 0, if finish time was not set else returns finish time.
+ */
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ /**
+ * set finish time of task.
+ * @param finishTime finish time of task.
+ */
+ void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ /**
+ * Get start time of task.
+ * @return 0 if start time was not set, else start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /**
+ * set start time of the task.
+ */
+ void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
//////////////////////////////////////////////
// Writable
//////////////////////////////////////////////
@@ -52,6 +85,8 @@
UTF8.writeString(out, taskid);
out.writeFloat(progress);
UTF8.writeString(out, state);
+ out.writeLong(startTime);
+ out.writeLong(finishTime);
new ObjectWritable(diagnostics).write(out);
}
@@ -59,7 +94,9 @@
this.taskid = UTF8.readString(in);
this.progress = in.readFloat();
this.state = UTF8.readString(in);
-
+ this.startTime = in.readLong();
+ this.finishTime = in.readLong() ;
+
ObjectWritable wrapper = new ObjectWritable();
wrapper.readFields(in);
diagnostics = (String[])wrapper.get();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Mon Sep 25 15:29:21 2006
@@ -18,6 +18,8 @@
import org.apache.hadoop.io.*;
import java.io.*;
+// enumeration for reporting current phase of a task.
+enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE};
/**************************************************
* Describes the current status of a task. This is
@@ -38,12 +40,22 @@
private String diagnosticInfo;
private String stateString;
private String taskTracker;
+
+ private long startTime ;
+ private long finishTime ;
+
+ // only for reduce tasks
+ private long shuffleFinishTime ;
+ private long sortFinishTime ;
+
+ private Phase phase = Phase.STARTING;
public TaskStatus() {}
public TaskStatus(String taskid, boolean isMap, float progress,
int runState, String diagnosticInfo,
- String stateString, String taskTracker) {
+ String stateString, String taskTracker,
+ Phase phase) {
this.taskid = taskid;
this.isMap = isMap;
this.progress = progress;
@@ -51,6 +63,7 @@
this.diagnosticInfo = diagnosticInfo;
this.stateString = stateString;
this.taskTracker = taskTracker;
+ this.phase = phase ;
}
public String getTaskId() { return taskid; }
@@ -65,7 +78,104 @@
public void setDiagnosticInfo(String info) { this.diagnosticInfo = info; }
public String getStateString() { return stateString; }
public void setStateString(String stateString) { this.stateString = stateString; }
+ /**
+ * Get task finish time. if shuffleFinishTime and sortFinishTime
+ * are not set before, these are set to finishTime. It takes care of
+ * the case when shuffle, sort and finish are completed with in the
+ * heartbeat interval and are not reported separately. if task state is
+ * TaskStatus.FAILED then finish time represents when the task failed.
+ * @return finish time of the task.
+ */
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ /**
+ * Sets finishTime.
+ * @param finishTime finish time of task.
+ */
+ void setFinishTime(long finishTime) {
+ if( shuffleFinishTime == 0 ) {
+ this.shuffleFinishTime = finishTime ;
+ }
+ if( sortFinishTime == 0 ){
+ this.sortFinishTime = finishTime ;
+ }
+ this.finishTime = finishTime;
+ }
+ /**
+ * Get shuffle finish time for the task. If shuffle finish time was
+ * not set due to shuffle/sort/finish phases ending within same
+ * heartbeat interval, it is set to finish time of next phase i.e. sort
+ * or task finish when these are set.
+ * @return 0 if shuffleFinishTime, sortFinishTime and finish time are not set. else
+ * it returns approximate shuffle finish time.
+ */
+ public long getShuffleFinishTime() {
+ return shuffleFinishTime;
+ }
+
+ /**
+ * Set shuffle finish time.
+ * @param shuffleFinishTime
+ */
+ void setShuffleFinishTime(long shuffleFinishTime) {
+ this.shuffleFinishTime = shuffleFinishTime;
+ }
+
+ /**
+ * Get sort finish time for the task,. If sort finish time was not set
+ * due to sort and reduce phase finishing in same heartebat interval, it is
+ * set to finish time, when finish time is set.
+ * @return 0 if sort finish time and finish time are not set, else returns sort
+ * finish time if that is set, else it returns finish time.
+ */
+ public long getSortFinishTime() {
+ return sortFinishTime;
+ }
+
+ /**
+ * Sets sortFinishTime, if shuffleFinishTime is not set before
+ * then its set to sortFinishTime.
+ * @param sortFinishTime
+ */
+ void setSortFinishTime(long sortFinishTime) {
+ this.sortFinishTime = sortFinishTime;
+ if( 0 == this.shuffleFinishTime){
+ this.shuffleFinishTime = sortFinishTime ;
+ }
+ }
+
+ /**
+ * Get start time of the task.
+ * @return 0 is start time is not set, else returns start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+ /**
+ * Set startTime of the task.
+ * @param startTime start time
+ */
+ void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+ /**
+ * Get current phase of this task. Phase.Map in case of map tasks,
+ * for reduce one of Phase.SHUFFLE, Phase.SORT or Phase.REDUCE.
+ * @return .
+ */
+ public Phase getPhase(){
+ return this.phase;
+ }
+ /**
+ * Set current phase of this task.
+ * @param p
+ */
+ void setPhase(Phase p){
+ this.phase = p ;
+ }
//////////////////////////////////////////////
// Writable
//////////////////////////////////////////////
@@ -76,6 +186,13 @@
out.writeInt(runState);
UTF8.writeString(out, diagnosticInfo);
UTF8.writeString(out, stateString);
+ WritableUtils.writeEnum(out, phase);
+ out.writeLong(startTime);
+ out.writeLong(finishTime);
+ if(! isMap){
+ out.writeLong(shuffleFinishTime);
+ out.writeLong(sortFinishTime);
+ }
}
public void readFields(DataInput in) throws IOException {
@@ -85,5 +202,13 @@
this.runState = in.readInt();
this.diagnosticInfo = UTF8.readString(in);
this.stateString = UTF8.readString(in);
- }
+ this.phase = WritableUtils.readEnum(in, Phase.class);
+ this.startTime = in.readLong();
+ this.finishTime = in.readLong() ;
+ if( ! this.isMap ){
+ shuffleFinishTime = in.readLong();
+ sortFinishTime = in.readLong();
+ }
+ }
}
+
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Sep 25 15:29:21 2006
@@ -771,7 +771,6 @@
Task task;
float progress;
int runstate;
- String stateString = "";
long lastProgressReport;
StringBuffer diagnosticInfo = new StringBuffer();
TaskRunner runner;
@@ -781,6 +780,7 @@
private JobConf localJobConf;
private boolean keepFailedTaskFiles;
private boolean alwaysKeepTaskFiles;
+ private TaskStatus taskStatus ;
private boolean keepJobFiles;
/**
@@ -789,10 +789,15 @@
this.task = task;
this.progress = 0.0f;
this.runstate = TaskStatus.UNASSIGNED;
- stateString = "initializing";
this.lastProgressReport = System.currentTimeMillis();
this.defaultJobConf = conf;
localJobConf = null;
+ taskStatus = new TaskStatus(task.getTaskId(),
+ task.isMapTask(),
+ progress, runstate,
+ diagnosticInfo.toString(),
+ "initializing",
+ getName(), task.isMapTask()?Phase.MAP:Phase.SHUFFLE);
keepJobFiles = false;
}
@@ -842,17 +847,14 @@
/**
*/
public synchronized TaskStatus createStatus() {
- TaskStatus status =
- new TaskStatus(task.getTaskId(),
- task.isMapTask(),
- progress, runstate,
- diagnosticInfo.toString(),
- (stateString == null) ? "" : stateString,
- getName());
- if (diagnosticInfo.length() > 0) {
- diagnosticInfo = new StringBuffer();
- }
- return status;
+ taskStatus.setProgress(progress);
+ taskStatus.setRunState(runstate);
+ taskStatus.setDiagnosticInfo(diagnosticInfo.toString());
+
+ if (diagnosticInfo.length() > 0) {
+ diagnosticInfo = new StringBuffer();
+ }
+ return taskStatus;
}
/**
@@ -863,17 +865,27 @@
this.runstate = TaskStatus.RUNNING;
this.runner = task.createRunner(TaskTracker.this);
this.runner.start();
+ this.taskStatus.setStartTime(System.currentTimeMillis());
}
/**
* The task is reporting its progress
*/
- public synchronized void reportProgress(float p, String state) {
+ public synchronized void reportProgress(float p, String state, Phase newPhase) {
LOG.info(task.getTaskId()+" "+p+"% "+state);
this.progress = p;
this.runstate = TaskStatus.RUNNING;
this.lastProgressReport = System.currentTimeMillis();
- this.stateString = state;
+ Phase oldPhase = taskStatus.getPhase() ;
+ if( oldPhase != newPhase ){
+ // sort phase started
+ if( newPhase == Phase.SORT ){
+ this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());
+ }else if( newPhase == Phase.REDUCE){
+ this.taskStatus.setSortFinishTime(System.currentTimeMillis());
+ }
+ }
+ this.taskStatus.setStateString(state);
}
/**
@@ -901,6 +913,7 @@
public synchronized void reportDone() {
LOG.info("Task " + task.getTaskId() + " is done.");
this.progress = 1.0f;
+ this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
}
@@ -936,7 +949,7 @@
runstate = TaskStatus.FAILED;
progress = 0.0f;
}
-
+ this.taskStatus.setFinishTime(System.currentTimeMillis());
needCleanup = runstate == TaskStatus.FAILED;
}
@@ -1052,10 +1065,10 @@
/**
* Called periodically to report Task progress, from 0.0 to 1.0.
*/
- public synchronized void progress(String taskid, float progress, String state) throws IOException {
+ public synchronized void progress(String taskid, float progress, String state, Phase phase) throws IOException {
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
if (tip != null) {
- tip.reportProgress(progress, state);
+ tip.reportProgress(progress, state, phase);
} else {
LOG.warn("Progress from unknown child task: "+taskid+". Ignored.");
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Mon Sep 25 15:29:21 2006
@@ -35,8 +35,9 @@
* @param taskid the id of the task
* @param progress value between zero and one
* @param state description of task's current state
+ * @param phase current phase of the task.
*/
- void progress(String taskid, float progress, String state)
+ void progress(String taskid, float progress, String state, Phase phase)
throws IOException;
/** Report error messages back to parent. Calls should be sparing, since all
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Mon Sep 25 15:29:21 2006
@@ -20,7 +20,10 @@
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
+import java.text.DateFormat;
import java.text.DecimalFormat;
+import java.util.Date;
+
import org.apache.hadoop.fs.*;
/**
@@ -193,4 +196,59 @@
}
return p;
}
+ /**
+ *
+ * Given a finish and start time in long milliseconds, returns a
+ * String in the format Xhrs, Ymins, Z sec, for the time difference between two times.
+ * If finish time comes before start time then negative valeus of X, Y and Z wil return.
+ *
+ * @param finishTime finish time
+ * @param statTime start time
+ * @return
+ */
+ public static String formatTimeDiff(long finishTime, long startTime){
+ StringBuffer buf = new StringBuffer() ;
+
+ long timeDiff = finishTime - startTime ;
+ long hours = timeDiff / (60*60*1000) ;
+ long rem = (timeDiff % (60*60*1000)) ;
+ long minutes = rem / (60*1000);
+ rem = rem % (60*1000) ;
+ long seconds = rem / 1000 ;
+
+ if( hours != 0 ){
+ buf.append(hours);
+ buf.append("hrs, ");
+ }
+ if( minutes != 0 ){
+ buf.append(minutes);
+ buf.append("mins, ");
+ }
+ // return "0sec if no difference
+ buf.append(seconds);
+ buf.append("sec");
+ return buf.toString();
+ }
+ /**
+ * Formats time in ms and appends difference (finishTime - startTime)
+ * as returned by formatTimeDiff().
+ * If finish time is 0, empty string is returned, if start time is 0
+ * then difference is not appended to return value.
+ * @param dateFormat date format to use
+ * @param finishTime fnish time
+ * @param startTime start time
+ * @return formatted value.
+ */
+ public static String getFormattedTimeWithDiff(DateFormat dateFormat,
+ long finishTime, long startTime){
+ StringBuffer buf = new StringBuffer();
+ if( 0 != finishTime ) {
+ buf.append(dateFormat.format(new Date(finishTime)));
+ if( 0 != startTime ){
+ buf.append(" (" + formatTimeDiff(finishTime , startTime) + ")");
+ }
+ }
+ return buf.toString();
+ }
+
}
Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Mon Sep 25 15:29:21 2006
@@ -7,8 +7,9 @@
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.util.*"
import="java.lang.Integer"
+ import="java.text.SimpleDateFormat"
%>
-
+<%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %>
<%
String jobid = request.getParameter("jobid");
String type = request.getParameter("type");
@@ -54,7 +55,7 @@
out.print("<h2>Tasks</h2>");
out.print("<center>");
out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");
- out.print("<tr><td align=\"center\">Task</td><td>Complete</td><td>Status</td><td>Errors</td></tr>");
+ out.print("<tr><td align=\"center\">Task</td><td>Complete</td><td>Status</td><td>Start Time</td><td>Finish Time</td><td>Errors</td></tr>");
if (end_index > report_len){
end_index = report_len;
}
@@ -66,6 +67,9 @@
out.print("<td>" + StringUtils.formatPercent(report.getProgress(),2) +
"</td>");
out.print("<td>" + report.getState() + "</td>");
+ out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, report.getStartTime(),0) + "</td>");
+ out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat,
+ report.getFinishTime(), report.getStartTime()) + "</td>");
String[] diagnostics = report.getDiagnostics();
for (int j = 0; j < diagnostics.length ; j++) {
out.print("<td><pre>" + diagnostics[j] + "</pre></td>");
Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?view=diff&rev=449840&r1=449839&r2=449840
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Mon Sep 25 15:29:21 2006
@@ -7,7 +7,10 @@
import="java.util.*"
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.util.*"
+ import="java.text.SimpleDateFormat"
+ import="org.apache.hadoop.util.*"
%>
+<%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %>
<%
String jobid = request.getParameter("jobid");
JobTracker tracker = JobTracker.getTracker();
@@ -47,9 +50,23 @@
<h2>All Task Attempts</h2>
<center>
+<%
+ if( ts.length == 0 ) {
+%>
+ <h3>No Task Attempts found</h3>
+<%
+ }else{
+%>
<table border=2 cellpadding="5" cellspacing="2">
-<tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Errors</td></tr>
-
+<tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td>
+ <%
+ if( ! ts[0].getIsMap() ) {
+ %>
+<td>Shuffle Finished</td><td>Sort Finished</td>
+ <%
+ }
+ %>
+<td>Finish Time</td><td>Errors</td></tr>
<%
for (int i = 0; i < ts.length; i++) {
TaskStatus status = ts[i];
@@ -68,6 +85,17 @@
out.print("</td>");
out.print("<td>"+ StringUtils.formatPercent(status.getProgress(),2) +
"</td>");
+ out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat,
+ status.getStartTime(), 0) + "</td>");
+ if( ! ts[i].getIsMap() ) {
+ out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat,
+ status.getShuffleFinishTime(), status.getStartTime()) + "</td>");
+ out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat,
+ status.getSortFinishTime(), status.getShuffleFinishTime()) + "</td>");
+ }
+ out.println("<td>"+ StringUtils.getFormattedTimeWithDiff(dateFormat,
+ status.getFinishTime(), status.getStartTime()) + "</td>");
+
out.print("<td><pre>");
List<String> failures = tracker.getTaskDiagnostics(jobid, tipid,
status.getTaskId());
@@ -84,6 +112,7 @@
out.print("</pre></td>");
out.print("</tr>\n");
}
+ }
%>
</table>
</center>