You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/07 08:59:45 UTC
svn commit: r702361 [2/2] - in /hadoop/core/branches/branch-0.19: ./ docs/
src/docs/src/documentation/content/xdocs/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/webapps/job/
Modified: hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Mon Oct 6 23:59:44 2008
@@ -1594,13 +1594,11 @@
<li>
Setup the job during initialization. For example, create
the temporary output directory for the job during the
- initialization of the job. The job client does the setup
- for the job.
+ initialization of the job.
</li>
<li>
Cleanup the job after the job completion. For example, remove the
- temporary output directory after the job completion. A separate
- task does the cleanupJob at the end of the job.
+ temporary output directory after the job completion.
</li>
<li>
Setup the task temporary output.
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Mon Oct 6 23:59:44 2008
@@ -58,7 +58,7 @@
LOG.error("Job initialization failed:\n" +
StringUtils.stringifyException(t));
if (job != null) {
- job.fail();
+ job.terminateJob(JobStatus.FAILED);
}
}
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java Mon Oct 6 23:59:44 2008
@@ -93,6 +93,8 @@
printJobDetails();
printTaskSummary();
printJobAnalysis();
+ printTasks("SETUP", "FAILED");
+ printTasks("SETUP", "KILLED");
printTasks("MAP", "FAILED");
printTasks("MAP", "KILLED");
printTasks("REDUCE", "FAILED");
@@ -100,9 +102,11 @@
printTasks("CLEANUP", "FAILED");
printTasks("CLEANUP", "KILLED");
if (printAll) {
+ printTasks("SETUP", "SUCCESS");
printTasks("MAP", "SUCCESS");
printTasks("REDUCE", "SUCCESS");
printTasks("CLEANUP", "SUCCESS");
+ printAllTaskAttempts("SETUP");
printAllTaskAttempts("MAP");
printAllTaskAttempts("REDUCE");
printAllTaskAttempts("CLEANUP");
@@ -219,6 +223,7 @@
int totalMaps = 0;
int totalReduces = 0;
int totalCleanups = 0;
+ int totalSetups = 0;
int numFailedMaps = 0;
int numKilledMaps = 0;
int numFailedReduces = 0;
@@ -226,12 +231,17 @@
int numFinishedCleanups = 0;
int numFailedCleanups = 0;
int numKilledCleanups = 0;
+ int numFinishedSetups = 0;
+ int numFailedSetups = 0;
+ int numKilledSetups = 0;
long mapStarted = 0;
long mapFinished = 0;
long reduceStarted = 0;
long reduceFinished = 0;
long cleanupStarted = 0;
long cleanupFinished = 0;
+ long setupStarted = 0;
+ long setupFinished = 0;
Map <String, String> allHosts = new TreeMap<String, String>();
@@ -286,6 +296,23 @@
attempt.get(Keys.TASK_STATUS))) {
numKilledCleanups++;
}
+ } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))){
+ if (setupStarted==0||setupStarted > startTime) {
+ setupStarted = startTime;
+ }
+ if (setupFinished < finishTime) {
+ setupFinished = finishTime;
+ }
+ totalSetups++;
+ if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+ numFinishedSetups++;
+ } else if (Values.FAILED.name().equals(
+ attempt.get(Keys.TASK_STATUS))) {
+ numFailedSetups++;
+ } else if (Values.KILLED.name().equals(
+ attempt.get(Keys.TASK_STATUS))) {
+ numKilledSetups++;
+ }
}
}
}
@@ -296,6 +323,14 @@
taskSummary.append("\nKind\tTotal\t");
taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
taskSummary.append("\n");
+ taskSummary.append("\nSetup\t").append(totalSetups);
+ taskSummary.append("\t").append(numFinishedSetups);
+ taskSummary.append("\t\t").append(numFailedSetups);
+ taskSummary.append("\t").append(numKilledSetups);
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, setupStarted, 0));
+ taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+ dateFormat, setupFinished, setupStarted));
taskSummary.append("\nMap\t").append(totalMaps);
taskSummary.append("\t").append(job.getInt(Keys.FINISHED_MAPS));
taskSummary.append("\t\t").append(numFailedMaps);
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java Mon Oct 6 23:59:44 2008
@@ -262,6 +262,15 @@
}
/**
+ * A float between 0.0 and 1.0, indicating the % of setup work
+ * completed.
+ */
+ public float setupProgress() throws IOException {
+ ensureFreshStatus();
+ return status.setupProgress();
+ }
+
+ /**
* Returns immediately whether the whole job is done yet or not.
*/
public synchronized boolean isComplete() throws IOException {
@@ -813,13 +822,6 @@
out.close();
}
- // skip doing setup if there are no maps for the job.
- // because if there are no maps, job is considered completed and successful
- if (splits.length != 0) {
- // do setupJob
- job.getOutputCommitter().setupJob(new JobContext(job));
- }
-
//
// Now, actually submit the job (using the submit name)
//
@@ -1039,7 +1041,18 @@
public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
return jobSubmitClient.getCleanupTaskReports(jobId);
}
-
+
+ /**
+ * Get the information of the current state of the setup tasks of a job.
+ *
+ * @param jobId the job to query.
+ * @return the list of all of the setup tips.
+ * @throws IOException
+ */
+ public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
+ return jobSubmitClient.getSetupTaskReports(jobId);
+ }
+
/**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
@Deprecated
public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java Mon Oct 6 23:59:44 2008
@@ -123,7 +123,7 @@
* most places in history file.
*/
public static enum Values {
- SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING
+ SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
}
// temp buffer for parsed dataa
@@ -923,12 +923,14 @@
}
/**
* Logs launch time of job.
+ *
* @param jobId job id, assigned by jobtracker.
* @param startTime start time of job.
* @param totalMaps total maps assigned by jobtracker.
* @param totalReduces total reduces.
*/
- public static void logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces){
+ public static void logInited(JobID jobId, long startTime,
+ int totalMaps, int totalReduces) {
if (!disableHistory){
String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
@@ -940,10 +942,45 @@
new String[] {jobId.toString(), String.valueOf(startTime),
String.valueOf(totalMaps),
String.valueOf(totalReduces),
+ Values.PREP.name()});
+ }
+ }
+ }
+
+ /**
+ * Logs the job as RUNNING.
+ *
+ * @param jobId job id, assigned by jobtracker.
+ * @param startTime start time of job.
+ * @param totalMaps total maps assigned by jobtracker.
+ * @param totalReduces total reduces.
+ * @deprecated Use {@link #logInited(JobID, long, int, int)} and
+ * {@link #logStarted(JobID)}
+ */
+ @Deprecated
+ public static void logStarted(JobID jobId, long startTime,
+ int totalMaps, int totalReduces) {
+ logStarted(jobId);
+ }
+
+ /**
+ * Logs job as running
+ * @param jobId job id, assigned by jobtracker.
+ */
+ public static void logStarted(JobID jobId){
+ if (!disableHistory){
+ String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
+ ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
+
+ if (null != writer){
+ JobHistory.log(writer, RecordTypes.Job,
+ new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
+ new String[] {jobId.toString(),
Values.RUNNING.name()});
}
}
}
+
/**
* Log job finished. closes the job file in history.
* @param jobId job id, assigned by jobtracker.
@@ -1197,12 +1234,11 @@
* @param startTime start time of task attempt as reported by task tracker.
* @param hostName host name of the task attempt.
* @deprecated Use
- * {@link #logStarted(TaskAttemptID, long, String, int,
- * boolean)}
+ * {@link #logStarted(TaskAttemptID, long, String, int, String)}
*/
@Deprecated
public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
- logStarted(taskAttemptId, startTime, hostName, -1, false);
+ logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
}
/**
@@ -1212,11 +1248,11 @@
* @param startTime start time of task attempt as reported by task tracker.
* @param trackerName name of the tracker executing the task attempt.
* @param httpPort http port of the task tracker executing the task attempt
- * @param isCleanup Whether the attempt is cleanup or not
+ * @param taskType Whether the attempt is cleanup or setup or map
*/
public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
String trackerName, int httpPort,
- boolean isCleanup){
+ String taskType) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
@@ -1226,8 +1262,7 @@
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
Keys.TRACKER_NAME, Keys.HTTP_PORT},
- new String[]{isCleanup ? Values.CLEANUP.name() :
- Values.MAP.name(),
+ new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
String.valueOf(startTime), trackerName,
@@ -1242,13 +1277,12 @@
* @param finishTime finish time
* @param hostName host name
* @deprecated Use
- * {@link #logFinished(TaskAttemptID, long, String, boolean, String,
- * Counters)}
+ * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
*/
@Deprecated
public static void logFinished(TaskAttemptID taskAttemptId, long finishTime,
String hostName){
- logFinished(taskAttemptId, finishTime, hostName, false, "",
+ logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "",
new Counters());
}
@@ -1258,14 +1292,15 @@
* @param taskAttemptId task attempt id
* @param finishTime finish time
* @param hostName host name
- * @param isCleanup Whether the attempt is cleanup or not
+ * @param taskType Whether the attempt is cleanup or setup or map
* @param stateString state string of the task attempt
* @param counter counters of the task attempt
*/
public static void logFinished(TaskAttemptID taskAttemptId,
long finishTime,
String hostName,
- boolean isCleanup, String stateString,
+ String taskType,
+ String stateString,
Counters counter) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
@@ -1277,8 +1312,7 @@
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.STATE_STRING, Keys.COUNTERS},
- new String[]{isCleanup ? Values.CLEANUP.name() :
- Values.MAP.name(),
+ new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.SUCCESS.name(),
@@ -1296,13 +1330,13 @@
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
* @deprecated Use
- * {@link #logFailed(TaskAttemptID, long, String, String, boolean)}
+ * {@link #logFailed(TaskAttemptID, long, String, String, String)}
*/
@Deprecated
public static void logFailed(TaskAttemptID taskAttemptId,
long timestamp, String hostName,
String error) {
- logFailed(taskAttemptId, timestamp, hostName, error, false);
+ logFailed(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
}
/**
@@ -1312,11 +1346,11 @@
* @param timestamp timestamp
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
- * @param isCleanup Whether the attempt is cleanup or not
+ * @param taskType Whether the attempt is cleanup or setup or map
*/
public static void logFailed(TaskAttemptID taskAttemptId,
long timestamp, String hostName,
- String error, boolean isCleanup) {
+ String error, String taskType) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
@@ -1326,8 +1360,7 @@
new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
- new String[]{ isCleanup ? Values.CLEANUP.name() :
- Values.MAP.name(),
+ new String[]{ taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.FAILED.name(),
@@ -1344,12 +1377,12 @@
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
* @deprecated Use
- * {@link #logKilled(TaskAttemptID, long, String, String, boolean)}
+ * {@link #logKilled(TaskAttemptID, long, String, String, String)}
*/
@Deprecated
public static void logKilled(TaskAttemptID taskAttemptId,
long timestamp, String hostName, String error){
- logKilled(taskAttemptId, timestamp, hostName, error, false);
+ logKilled(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
}
/**
@@ -1359,11 +1392,11 @@
* @param timestamp timestamp
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
- * @param isCleanup Whether the attempt is cleanup or not
+ * @param taskType Whether the attempt is cleanup or setup or map
*/
public static void logKilled(TaskAttemptID taskAttemptId,
long timestamp, String hostName,
- String error, boolean isCleanup){
+ String error, String taskType) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
@@ -1374,8 +1407,7 @@
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.ERROR},
- new String[]{ isCleanup ? Values.CLEANUP.name() :
- Values.MAP.name(),
+ new String[]{ taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.KILLED.name(),
@@ -1396,12 +1428,12 @@
* @param startTime start time
* @param hostName host name
* @deprecated Use
- * {@link #logStarted(TaskAttemptID, long, String, int, boolean)}
+ * {@link #logStarted(TaskAttemptID, long, String, int, String)}
*/
@Deprecated
public static void logStarted(TaskAttemptID taskAttemptId,
long startTime, String hostName){
- logStarted(taskAttemptId, startTime, hostName, -1, false);
+ logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
}
/**
@@ -1411,11 +1443,12 @@
* @param startTime start time
* @param trackerName tracker name
* @param httpPort the http port of the tracker executing the task attempt
- * @param isCleanup Whether the attempt is cleanup or not
+ * @param taskType Whether the attempt is cleanup or setup or reduce
*/
public static void logStarted(TaskAttemptID taskAttemptId,
long startTime, String trackerName,
- int httpPort, boolean isCleanup) {
+ int httpPort,
+ String taskType) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
@@ -1425,8 +1458,7 @@
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
Keys.TRACKER_NAME, Keys.HTTP_PORT},
- new String[]{isCleanup ? Values.CLEANUP.name() :
- Values.REDUCE.name(),
+ new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
String.valueOf(startTime), trackerName,
@@ -1443,15 +1475,15 @@
* @param finishTime finish time of task
* @param hostName host name where task attempt executed
* @deprecated Use
- * {@link #logFinished(TaskAttemptID, long, long, long, String, boolean,
- * String, Counters)}
+ * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
*/
@Deprecated
public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished,
long sortFinished, long finishTime,
String hostName){
logFinished(taskAttemptId, shuffleFinished, sortFinished,
- finishTime, hostName, false, "", new Counters());
+ finishTime, hostName, Values.REDUCE.name(),
+ "", new Counters());
}
/**
@@ -1462,14 +1494,14 @@
* @param sortFinished sort finish time
* @param finishTime finish time of task
* @param hostName host name where task attempt executed
- * @param isCleanup Whether the attempt is cleanup or not
+ * @param taskType Whether the attempt is cleanup or setup or reduce
* @param stateString the state string of the attempt
* @param counter counters of the attempt
*/
public static void logFinished(TaskAttemptID taskAttemptId,
long shuffleFinished,
long sortFinished, long finishTime,
- String hostName, boolean isCleanup,
+ String hostName, String taskType,
String stateString, Counters counter) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
@@ -1482,8 +1514,7 @@
Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.STATE_STRING, Keys.COUNTERS},
- new String[]{isCleanup ? Values.CLEANUP.name() :
- Values.REDUCE.name(),
+ new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.SUCCESS.name(),
@@ -1503,12 +1534,12 @@
* @param hostName host name of the task attempt.
* @param error error message of the task.
* @deprecated Use
- * {@link #logFailed(TaskAttemptID, long, String, String, boolean)}
+ * {@link #logFailed(TaskAttemptID, long, String, String, String)}
*/
@Deprecated
public static void logFailed(TaskAttemptID taskAttemptId, long timestamp,
String hostName, String error){
- logFailed(taskAttemptId, timestamp, hostName, error, false);
+ logFailed(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
}
/**
@@ -1518,11 +1549,11 @@
* @param timestamp time stamp when task failed
* @param hostName host name of the task attempt.
* @param error error message of the task.
- * @param isCleanup Whether the attempt is cleanup or not
+ * @param taskType Whether the attempt is cleanup or setup or reduce
*/
public static void logFailed(TaskAttemptID taskAttemptId, long timestamp,
String hostName, String error,
- boolean isCleanup) {
+ String taskType) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
@@ -1533,8 +1564,7 @@
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.ERROR },
- new String[]{ isCleanup ? Values.CLEANUP.name() :
- Values.REDUCE.name(),
+ new String[]{ taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.FAILED.name(),
@@ -1550,12 +1580,12 @@
* @param hostName host name of the task attempt.
* @param error error message of the task.
* @deprecated Use
- * {@link #logKilled(TaskAttemptID, long, String, String, boolean)}
+ * {@link #logKilled(TaskAttemptID, long, String, String, String)}
*/
@Deprecated
public static void logKilled(TaskAttemptID taskAttemptId, long timestamp,
String hostName, String error) {
- logKilled(taskAttemptId, timestamp, hostName, error, false);
+ logKilled(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
}
/**
@@ -1565,11 +1595,11 @@
* @param timestamp time stamp when task failed
* @param hostName host name of the task attempt.
* @param error error message of the task.
- * @param isCleanup Whether the attempt is cleanup or not
- */
+ * @param taskType Whether the attempt is cleanup or setup or reduce
+ */
public static void logKilled(TaskAttemptID taskAttemptId, long timestamp,
String hostName, String error,
- boolean isCleanup) {
+ String taskType) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
@@ -1580,8 +1610,7 @@
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.ERROR },
- new String[]{ isCleanup ? Values.CLEANUP.name() :
- Values.REDUCE.name(),
+ new String[]{ taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.KILLED.name(),
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Oct 6 23:59:44 2008
@@ -63,6 +63,7 @@
TaskInProgress maps[] = new TaskInProgress[0];
TaskInProgress reduces[] = new TaskInProgress[0];
TaskInProgress cleanup[] = new TaskInProgress[0];
+ TaskInProgress setup[] = new TaskInProgress[0];
int numMapTasks = 0;
int numReduceTasks = 0;
@@ -83,6 +84,7 @@
int failedMapTIPs = 0;
int failedReduceTIPs = 0;
private volatile boolean launchedCleanup = false;
+ private volatile boolean launchedSetup = false;
private volatile boolean jobKilled = false;
private volatile boolean jobFailed = false;
@@ -382,12 +384,13 @@
// Finished time need to be setted here to prevent this job to be retired
// from the job tracker jobs at the next retire iteration.
this.finishTime = this.launchTime;
+ status.setSetupProgress(1.0f);
status.setMapProgress(1.0f);
status.setReduceProgress(1.0f);
status.setCleanupProgress(1.0f);
status.setRunState(JobStatus.SUCCEEDED);
tasksInited.set(true);
- JobHistory.JobInfo.logStarted(profile.getJobID(),
+ JobHistory.JobInfo.logInited(profile.getJobID(),
this.launchTime, 0, 0);
JobHistory.JobInfo.logFinished(profile.getJobID(),
this.finishTime, 0, 0, 0, 0,
@@ -422,12 +425,22 @@
numReduceTasks, jobtracker, conf, this);
cleanup[1].setCleanupTask();
- this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, 0.0f,
- JobStatus.RUNNING, status.getJobPriority());
+ // create two setup tips, one map and one reduce.
+ setup = new TaskInProgress[2];
+ // setup map tip. This map is doesn't use split.
+ // Just assign splits[0]
+ setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
+ jobtracker, conf, this, numMapTasks + 1 );
+ setup[0].setSetupTask();
+
+ // setup reduce tip.
+ setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+ numReduceTasks + 1, jobtracker, conf, this);
+ setup[1].setSetupTask();
+
tasksInited.set(true);
-
- JobHistory.JobInfo.logStarted(profile.getJobID(), this.launchTime,
- numMapTasks, numReduceTasks);
+ JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
+ numMapTasks, numReduceTasks);
}
/////////////////////////////////////////////////////
@@ -533,6 +546,14 @@
}
/**
+ * Get the list of setup tasks
+ * @return the array of setup tasks for the job
+ */
+ TaskInProgress[] getSetupTasks() {
+ return setup;
+ }
+
+ /**
* Get the list of reduce tasks
* @return the raw array of reduce tasks for this job
*/
@@ -611,6 +632,21 @@
return results;
}
+ /**
+ * Return a vector of setup TaskInProgress objects
+ */
+ public synchronized Vector<TaskInProgress> reportSetupTIPs(
+ boolean shouldBeComplete) {
+
+ Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+ for (int i = 0; i < setup.length; i++) {
+ if (setup[i].isComplete() == shouldBeComplete) {
+ results.add(setup[i]);
+ }
+ }
+ return results;
+ }
+
////////////////////////////////////////////////////
// Status update methods
////////////////////////////////////////////////////
@@ -655,7 +691,9 @@
taskCompletionEventTracker,
taskid,
tip.idWithinJob(),
- status.getIsMap(),
+ status.getIsMap() &&
+ !tip.isCleanupTask() &&
+ !tip.isSetupTask(),
TaskCompletionEvent.Status.SUCCEEDED,
httpTaskLogLocation
);
@@ -698,7 +736,9 @@
taskEvent = new TaskCompletionEvent(taskCompletionEventTracker,
taskid,
tip.idWithinJob(),
- status.getIsMap(),
+ status.getIsMap() &&
+ !tip.isCleanupTask() &&
+ !tip.isSetupTask(),
taskCompletionStatus,
httpTaskLogLocation
);
@@ -727,7 +767,7 @@
oldProgress + " to " + tip.getProgress());
}
- if (!tip.isCleanupTask()) {
+ if (!tip.isCleanupTask() && !tip.isSetupTask()) {
double progressDelta = tip.getProgress() - oldProgress;
if (tip.isMapTask()) {
if (maps.length == 0) {
@@ -860,11 +900,7 @@
// Now launch the cleanupTask
Task result = tip.getTaskToRun(tts.getTrackerName());
if (result != null) {
- launchedCleanup = true;
- if (tip.isFirstAttempt(result.getTaskID())) {
- JobHistory.Task.logStarted(tip.getTIPId(),
- Values.CLEANUP.name(), System.currentTimeMillis(), "");
- }
+ addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
}
return result;
}
@@ -878,8 +914,12 @@
* @return true/false
*/
private synchronized boolean canLaunchCleanupTask() {
+ if (!tasksInited.get()) {
+ return false;
+ }
// check if the job is running
- if (status.getRunState() != JobStatus.RUNNING) {
+ if (status.getRunState() != JobStatus.RUNNING &&
+ status.getRunState() != JobStatus.PREP) {
return false;
}
// check if cleanup task has been launched already.
@@ -899,8 +939,63 @@
}
return launchCleanupTask;
}
+
+ /**
+ * Return a SetupTask, if appropriate, to run on the given tasktracker
+ *
+ */
+ public synchronized Task obtainSetupTask(TaskTrackerStatus tts,
+ int clusterSize,
+ int numUniqueHosts,
+ boolean isMapSlot
+ ) throws IOException {
+ if (!canLaunchSetupTask()) {
+ return null;
+ }
+
+ String taskTracker = tts.getTrackerName();
+ // Update the last-known clusterSize
+ this.clusterSize = clusterSize;
+ if (!shouldRunOnTaskTracker(taskTracker)) {
+ return null;
+ }
+
+ List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+ if (isMapSlot) {
+ setupTaskList.add(setup[0]);
+ } else {
+ setupTaskList.add(setup[1]);
+ }
+ TaskInProgress tip = findTaskFromList(setupTaskList,
+ tts, numUniqueHosts, false);
+ if (tip == null) {
+ return null;
+ }
+
+ // Now launch the setupTask
+ Task result = tip.getTaskToRun(tts.getTrackerName());
+ if (result != null) {
+ addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+ }
+ return result;
+ }
/**
+ * Check whether setup task can be launched for the job.
+ *
+ * Setup task can be launched after the tasks are inited
+ * and Job is in PREP state
+ * and if it is not already launched
+ * or job is not Killed/Failed
+ * @return true/false
+ */
+ private synchronized boolean canLaunchSetupTask() {
+ return (tasksInited.get() && status.getRunState() == JobStatus.PREP &&
+ !launchedSetup && !jobKilled && !jobFailed);
+ }
+
+
+ /**
* Return a ReduceTask, if appropriate, to run on the given tasktracker.
* We don't have cache-sensitivity for reduce tasks, as they
* work on temporary MapRed files.
@@ -955,8 +1050,14 @@
boolean isScheduled) {
// keeping the earlier ordering intact
String name;
- Enum counter;
- if (tip.isMapTask()) {
+ Enum counter = null;
+ if (tip.isSetupTask()) {
+ launchedSetup = true;
+ name = Values.SETUP.name();
+ } else if (tip.isCleanupTask()) {
+ launchedCleanup = true;
+ name = Values.CLEANUP.name();
+ } else if (tip.isMapTask()) {
++runningMapTasks;
name = Values.MAP.name();
counter = Counter.TOTAL_LAUNCHED_MAPS;
@@ -975,7 +1076,9 @@
JobHistory.Task.logStarted(tip.getTIPId(), name,
tip.getExecStartTime(), "");
}
- jobCounters.incrCounter(counter, 1);
+ if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+ jobCounters.incrCounter(counter, 1);
+ }
// Make an entry in the tip if the attempt is not scheduled i.e externally
// added
@@ -994,7 +1097,7 @@
//
// So to simplify, increment the data locality counter whenever there is
// data locality.
- if (tip.isMapTask()) {
+ if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
// increment the data locality counter for maps
Node tracker = jobtracker.getNode(tts.getHost());
int level = this.maxLevel;
@@ -1648,65 +1751,45 @@
TaskTrackerStatus ttStatus =
this.jobtracker.getTaskTracker(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
+ String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+ tip.isSetupTask() ? Values.SETUP.name() :
+ tip.isMapTask() ? Values.MAP.name() :
+ Values.REDUCE.name();
if (status.getIsMap()){
JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(),
status.getTaskTracker(),
ttStatus.getHttpPort(),
- tip.isCleanupTask());
+ taskType);
JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(),
- trackerHostname, tip.isCleanupTask(),
+ trackerHostname, taskType,
status.getStateString(),
status.getCounters());
- JobHistory.Task.logFinished(tip.getTIPId(),
- tip.isCleanupTask() ? Values.CLEANUP.name() :
- Values.MAP.name(),
- tip.getExecFinishTime(),
- status.getCounters());
}else{
JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(),
status.getTaskTracker(),
ttStatus.getHttpPort(),
- tip.isCleanupTask());
+ taskType);
JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
status.getSortFinishTime(), status.getFinishTime(),
- trackerHostname, tip.isCleanupTask(),
+ trackerHostname,
+ taskType,
status.getStateString(),
status.getCounters());
- JobHistory.Task.logFinished(tip.getTIPId(),
- tip.isCleanupTask() ? Values.CLEANUP.name() :
- Values.REDUCE.name(), tip.getExecFinishTime(),
- status.getCounters());
}
+ JobHistory.Task.logFinished(tip.getTIPId(),
+ taskType,
+ tip.getExecFinishTime(),
+ status.getCounters());
int newNumAttempts = tip.getActiveTasks().size();
- if (!tip.isCleanupTask()) {
- if (tip.isMapTask()) {
- runningMapTasks -= 1;
- // check if this was a sepculative task
- if (oldNumAttempts > 1) {
- speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
- }
- finishedMapTasks += 1;
- metrics.completeMap(taskid);
- // remove the completed map from the resp running caches
- retireMap(tip);
- if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
- this.status.setMapProgress(1.0f);
- }
- } else {
- runningReduceTasks -= 1;
- if (oldNumAttempts > 1) {
- speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
- }
- finishedReduceTasks += 1;
- metrics.completeReduce(taskid);
- // remove the completed reduces from the running reducers set
- retireReduce(tip);
- if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
- this.status.setReduceProgress(1.0f);
- }
- }
- } else {
+ if (tip.isSetupTask()) {
+ // setup task has finished. kill the extra setup tip
+ killSetupTip(!tip.isMapTask());
+ // Job can start running now.
+ this.status.setSetupProgress(1.0f);
+ this.status.setRunState(JobStatus.RUNNING);
+ JobHistory.JobInfo.logStarted(profile.getJobID());
+ } else if (tip.isCleanupTask()) {
// cleanup task has finished. Kill the extra cleanup tip
if (tip.isMapTask()) {
// kill the reduce tip
@@ -1730,7 +1813,31 @@
// The job has been killed/failed/successful
// JobTracker should cleanup this task
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
- return false;
+ } else if (tip.isMapTask()) {
+ runningMapTasks -= 1;
+ // check if this was a sepculative task
+ if (oldNumAttempts > 1) {
+ speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
+ }
+ finishedMapTasks += 1;
+ metrics.completeMap(taskid);
+ // remove the completed map from the resp running caches
+ retireMap(tip);
+ if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
+ this.status.setMapProgress(1.0f);
+ }
+ } else {
+ runningReduceTasks -= 1;
+ if (oldNumAttempts > 1) {
+ speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
+ }
+ finishedReduceTasks += 1;
+ metrics.completeReduce(taskid);
+ // remove the completed reduces from the running reducers set
+ retireReduce(tip);
+ if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
+ this.status.setReduceProgress(1.0f);
+ }
}
return true;
@@ -1764,7 +1871,7 @@
}
}
- private synchronized void terminateJob(int jobTerminationState) {
+ synchronized void terminateJob(int jobTerminationState) {
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
if (jobTerminationState == JobStatus.FAILED) {
@@ -1859,6 +1966,8 @@
if (wasRunning && !isRunning) {
if (tip.isCleanupTask()) {
launchedCleanup = false;
+ } else if (tip.isSetupTask()) {
+ launchedSetup = false;
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
// remove from the running queue and put it in the non-running cache
@@ -1896,36 +2005,41 @@
// update job history
String taskTrackerName = taskTrackerStatus.getHost();
long finishTime = status.getFinishTime();
+ String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+ tip.isSetupTask() ? Values.SETUP.name() :
+ tip.isMapTask() ? Values.MAP.name() :
+ Values.REDUCE.name();
if (status.getIsMap()) {
JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(),
status.getTaskTracker(), taskTrackerStatus.getHttpPort(),
- tip.isCleanupTask());
+ taskType);
if (status.getRunState() == TaskStatus.State.FAILED) {
JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
- taskTrackerName, status.getDiagnosticInfo(), tip.isCleanupTask());
+ taskTrackerName, status.getDiagnosticInfo(),
+ taskType);
} else {
JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
taskTrackerName, status.getDiagnosticInfo(),
- tip.isCleanupTask());
+ taskType);
}
} else {
JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(),
status.getTaskTracker(), taskTrackerStatus.getHttpPort(),
- tip.isCleanupTask());
+ taskType);
if (status.getRunState() == TaskStatus.State.FAILED) {
JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
taskTrackerName, status.getDiagnosticInfo(),
- tip.isCleanupTask());
+ taskType);
} else {
JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
taskTrackerName, status.getDiagnosticInfo(),
- tip.isCleanupTask());
+ taskType);
}
}
// After this, try to assign tasks with the one after this, so that
// the failed task goes to the end of the list.
- if (!tip.isCleanupTask()) {
+ if (!tip.isCleanupTask() && !tip.isSetupTask()) {
if (tip.isMapTask()) {
failedMapTasks++;
} else {
@@ -1955,7 +2069,7 @@
// Allow upto 'mapFailuresPercent' of map tasks to fail or
// 'reduceFailuresPercent' of reduce tasks to fail
//
- boolean killJob = tip.isCleanupTask() ? true :
+ boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
tip.isMapTask() ?
((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -1963,12 +2077,8 @@
if (killJob) {
LOG.info("Aborting job " + profile.getJobID());
JobHistory.Task.logFailed(tip.getTIPId(),
- tip.isCleanupTask() ?
- Values.CLEANUP.name() :
- tip.isMapTask() ?
- Values.MAP.name() :
- Values.REDUCE.name(),
- status.getFinishTime(),
+ taskType,
+ status.getFinishTime(),
status.getDiagnosticInfo());
if (tip.isCleanupTask()) {
// kill the other tip
@@ -1979,6 +2089,10 @@
}
terminateJob(JobStatus.FAILED);
} else {
+ if (tip.isSetupTask()) {
+ // kill the other tip
+ killSetupTip(!tip.isMapTask());
+ }
fail();
}
}
@@ -1986,7 +2100,7 @@
//
// Update the counters
//
- if (!tip.isCleanupTask()) {
+ if (!tip.isCleanupTask() && !tip.isSetupTask()) {
if (tip.isMapTask()) {
jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
} else {
@@ -1996,6 +2110,14 @@
}
}
+ void killSetupTip(boolean isMap) {
+ if (isMap) {
+ setup[0].kill();
+ } else {
+ setup[1].kill();
+ }
+ }
+
/**
* Fail a task with a given reason, but without a status object.
* @param tip The task's tip
@@ -2018,6 +2140,7 @@
updateTaskStatus(tip, status, metrics);
JobHistory.Task.logFailed(tip.getTIPId(),
tip.isCleanupTask() ? Values.CLEANUP.name() :
+ tip.isSetupTask() ? Values.SETUP.name() :
tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(),
tip.getExecFinishTime(), reason, taskid);
}
@@ -2072,18 +2195,24 @@
*/
public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
if (tipid.isMap()) {
- if (tipid.getId() == maps.length) { // cleanup map tip
+ if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
return cleanup[0];
}
+ if (tipid.equals(setup[0].getTIPId())) { //setup map tip
+ return setup[0];
+ }
for (int i = 0; i < maps.length; i++) {
if (tipid.equals(maps[i].getTIPId())){
return maps[i];
}
}
} else {
- if (tipid.getId() == reduces.length) { // cleanup reduce tip
+ if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
return cleanup[1];
}
+ if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
+ return setup[1];
+ }
for (int i = 0; i < reduces.length; i++) {
if (tipid.equals(reduces[i].getTIPId())){
return reduces[i];
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java Mon Oct 6 23:59:44 2008
@@ -52,6 +52,7 @@
private float mapProgress;
private float reduceProgress;
private float cleanupProgress;
+ private float setupProgress;
private int runState;
private long startTime;
private String user;
@@ -99,7 +100,25 @@
*/
public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
float cleanupProgress, int runState, JobPriority jp) {
+ this(jobid, 0.0f, mapProgress, reduceProgress,
+ cleanupProgress, runState, jp);
+ }
+
+ /**
+ * Create a job status object for a given jobid.
+ * @param jobid The jobid of the job
+ * @param setupProgress The progress made on the setup
+ * @param mapProgress The progress made on the maps
+ * @param reduceProgress The progress made on the reduces
+ * @param cleanupProgress The progress made on the cleanup
+ * @param runState The current state of the job
+ * @param jp Priority of the job.
+ */
+ public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+ float reduceProgress, float cleanupProgress,
+ int runState, JobPriority jp) {
this.jobid = jobid;
+ this.setupProgress = setupProgress;
this.mapProgress = mapProgress;
this.reduceProgress = reduceProgress;
this.cleanupProgress = cleanupProgress;
@@ -110,6 +129,7 @@
}
priority = jp;
}
+
/**
* @deprecated use getJobID instead
*/
@@ -148,6 +168,19 @@
}
/**
+ * @return Percentage of progress in setup
+ */
+ public synchronized float setupProgress() { return setupProgress; }
+
+ /**
+ * Sets the setup progress of this job
+ * @param p The value of setup progress to set to
+ */
+ synchronized void setSetupProgress(float p) {
+ this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+ }
+
+ /**
* @return Percentage of progress in reduce
*/
public synchronized float reduceProgress() { return reduceProgress; }
@@ -232,6 +265,7 @@
///////////////////////////////////////
public synchronized void write(DataOutput out) throws IOException {
jobid.write(out);
+ out.writeFloat(setupProgress);
out.writeFloat(mapProgress);
out.writeFloat(reduceProgress);
out.writeFloat(cleanupProgress);
@@ -244,6 +278,7 @@
public synchronized void readFields(DataInput in) throws IOException {
this.jobid = JobID.read(in);
+ this.setupProgress = in.readFloat();
this.mapProgress = in.readFloat();
this.reduceProgress = in.readFloat();
this.cleanupProgress = in.readFloat();
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Mon Oct 6 23:59:44 2008
@@ -47,8 +47,10 @@
* and getAllJobs(queue) as a part of HADOOP-3930
* Version 14: Added setPriority for HADOOP-4124
* Version 15: Added KILLED status to JobStatus as part of HADOOP-3924
+ * Version 16: Added getSetupTaskReports and
+ * setupProgress to JobStatus as part of HADOOP-4261
*/
- public static final long versionID = 15L;
+ public static final long versionID = 16L;
/**
* Allocate a name for the job.
@@ -123,6 +125,11 @@
public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;
/**
+ * Grab a bunch of info on the setup tasks that make up the job
+ */
+ public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException;
+
+ /**
* A MapReduce system always operates on a single filesystem. This
* function returns the fs name. ('local' if the localfs; 'addr:port'
* if dfs). The client can then copy files into the right locations
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Oct 6 23:59:44 2008
@@ -531,7 +531,7 @@
// is updated
private void checkAndInit() throws IOException {
String jobStatus = this.job.get(Keys.JOB_STATUS);
- if (Values.RUNNING.name().equals(jobStatus)) {
+ if (Values.PREP.name().equals(jobStatus)) {
hasUpdates = true;
LOG.info("Calling init from RM for job " + jip.getJobID().toString());
jip.initTasks();
@@ -1860,7 +1860,7 @@
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
- List<Task> tasks = getCleanupTask(taskTrackerStatus);
+ List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
@@ -2099,8 +2099,9 @@
return null;
}
- private synchronized List<Task> getCleanupTask(TaskTrackerStatus taskTracker)
- throws IOException {
+ // returns cleanup tasks first, then setup tasks.
+ private synchronized List<Task> getSetupAndCleanupTasks(
+ TaskTrackerStatus taskTracker) throws IOException {
int maxMapTasks = taskTracker.getMaxMapTasks();
int maxReduceTasks = taskTracker.getMaxReduceTasks();
int numMaps = taskTracker.countMapTasks();
@@ -2120,6 +2121,15 @@
return Collections.singletonList(t);
}
}
+ for (Iterator<JobInProgress> it = jobs.values().iterator();
+ it.hasNext();) {
+ JobInProgress job = it.next();
+ t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+ numUniqueHosts, true);
+ if (t != null) {
+ return Collections.singletonList(t);
+ }
+ }
}
if (numReduces < maxReduceTasks) {
for (Iterator<JobInProgress> it = jobs.values().iterator();
@@ -2131,6 +2141,15 @@
return Collections.singletonList(t);
}
}
+ for (Iterator<JobInProgress> it = jobs.values().iterator();
+ it.hasNext();) {
+ JobInProgress job = it.next();
+ t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+ numUniqueHosts, false);
+ if (t != null) {
+ return Collections.singletonList(t);
+ }
+ }
}
}
return null;
@@ -2353,6 +2372,28 @@
}
+ public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
+ JobInProgress job = jobs.get(jobid);
+ if (job == null) {
+ return new TaskReport[0];
+ } else {
+ Vector<TaskReport> reports = new Vector<TaskReport>();
+ Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
+ for (Iterator<TaskInProgress> it = completeTasks.iterator();
+ it.hasNext();) {
+ TaskInProgress tip = (TaskInProgress) it.next();
+ reports.add(tip.generateSingleReport());
+ }
+ Vector<TaskInProgress> incompleteTasks = job.reportSetupTIPs(false);
+ for (Iterator<TaskInProgress> it = incompleteTasks.iterator();
+ it.hasNext();) {
+ TaskInProgress tip = (TaskInProgress) it.next();
+ reports.add(tip.generateSingleReport());
+ }
+ return reports.toArray(new TaskReport[reports.size()]);
+ }
+ }
+
TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
/*
@@ -2576,7 +2617,8 @@
// And completed maps with zero reducers of the job
// never need to be failed.
if (!tip.isComplete() ||
- (tip.isMapTask() && job.desiredReduces() != 0)) {
+ (tip.isMapTask() && !tip.isSetupTask() &&
+ job.desiredReduces() != 0)) {
// if the job is done, we don't want to change anything
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName),
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Mon Oct 6 23:59:44 2008
@@ -114,6 +114,8 @@
}
JobContext jContext = new JobContext(conf);
OutputCommitter outputCommitter = job.getOutputCommitter();
+ outputCommitter.setupJob(jContext);
+ status.setSetupProgress(1.0f);
DataOutputBuffer buffer = new DataOutputBuffer();
for (int i = 0; i < splits.length; i++) {
@@ -336,6 +338,9 @@
public TaskReport[] getCleanupTaskReports(JobID id) {
return new TaskReport[0];
}
+ public TaskReport[] getSetupTaskReports(JobID id) {
+ return new TaskReport[0];
+ }
public JobStatus getJobStatus(JobID id) {
Job job = jobs.get(id);
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon Oct 6 23:59:44 2008
@@ -283,6 +283,10 @@
runCleanup(umbilical);
return;
}
+ if (setupJob) {
+ runSetupJob(umbilical);
+ return;
+ }
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java Mon Oct 6 23:59:44 2008
@@ -30,12 +30,10 @@
* <li>
* Setup the job during initialization. For example, create the temporary
* output directory for the job during the initialization of the job.
- * The job client does the setup for the job.
* </li>
* <li>
* Cleanup the job after the job completion. For example, remove the
- * temporary output directory after the job completion. CleanupJob is done
- * by a separate task at the end of the job.
+ * temporary output directory after the job completion.
* </li>
* <li>
* Setup the task temporary output.
@@ -61,8 +59,6 @@
/**
* For the framework to setup the job output during initialization
*
- * The job client does the setup for the job.
- *
* @param jobContext Context of the job whose output is being written.
* @throws IOException if temporary output could not be created
*/
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Oct 6 23:59:44 2008
@@ -336,7 +336,7 @@
job.setBoolean("mapred.skip.on", isSkipping());
Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
- if (!cleanupJob) {
+ if (!cleanupJob && !setupJob) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
@@ -351,6 +351,10 @@
runCleanup(umbilical);
return;
}
+ if (setupJob) {
+ runSetupJob(umbilical);
+ return;
+ }
// Initialize the codec
codec = initCodec();
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java Mon Oct 6 23:59:44 2008
@@ -94,6 +94,15 @@
public float cleanupProgress() throws IOException;
/**
+ * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
+ * and 1.0. When all setup tasks have completed, the function returns 1.0.
+ *
+ * @return the progress of the job's setup-tasks.
+ * @throws IOException
+ */
+ public float setupProgress() throws IOException;
+
+ /**
* Check if the job is finished or not.
* This is a non-blocking call.
*
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java Mon Oct 6 23:59:44 2008
@@ -108,6 +108,7 @@
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
protected boolean cleanupJob = false;
+ protected boolean setupJob = false;
private Thread pingProgressThread;
//skip ranges based on failed ranges from previous attempts
@@ -241,6 +242,9 @@
cleanupJob = true;
}
+ public void setSetupTask() {
+ setupJob = true;
+ }
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -253,6 +257,7 @@
skipRanges.write(out);
out.writeBoolean(skipping);
out.writeBoolean(cleanupJob);
+ out.writeBoolean(setupJob);
out.writeBoolean(writeSkipRecs);
}
public void readFields(DataInput in) throws IOException {
@@ -266,6 +271,7 @@
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
cleanupJob = in.readBoolean();
+ setupJob = in.readBoolean();
writeSkipRecs = in.readBoolean();
}
@@ -718,6 +724,14 @@
conf.getOutputCommitter().cleanupJob(jobContext);
done(umbilical);
}
+
+ protected void runSetupJob(TaskUmbilicalProtocol umbilical)
+ throws IOException {
+ // do the setup
+ getProgress().setStatus("setup");
+ conf.getOutputCommitter().setupJob(jobContext);
+ done(umbilical);
+ }
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Mon Oct 6 23:59:44 2008
@@ -83,6 +83,7 @@
private FailedRanges failedRanges = new FailedRanges();
private volatile boolean skipping = false;
private boolean cleanup = false;
+ private boolean setup = false;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
@@ -180,7 +181,15 @@
public void setCleanupTask() {
cleanup = true;
}
-
+
+ public boolean isSetupTask() {
+ return setup;
+ }
+
+ public void setSetupTask() {
+ setup = true;
+ }
+
public boolean isOnlyCommitPending() {
for (TaskStatus t : taskStatuses.values()) {
if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
@@ -380,7 +389,8 @@
(job.getStatus().getRunState() != JobStatus.RUNNING)) {
tasksReportedClosed.add(taskid);
close = true;
- } else if (isComplete() && !(isMapTask() && isComplete(taskid)) &&
+ } else if (isComplete() &&
+ !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
!tasksReportedClosed.contains(taskid)) {
tasksReportedClosed.add(taskid);
close = true;
@@ -565,7 +575,7 @@
// should note this failure only for completed maps, only if this taskid;
// completed this map. however if the job is done, there is no need to
// manipulate completed maps
- if (this.isMapTask() && isComplete(taskid) &&
+ if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) &&
jobStatus.getRunState() != JobStatus.SUCCEEDED) {
this.completes--;
@@ -850,6 +860,9 @@
if (cleanup) {
t.setCleanupTask();
}
+ if (setup) {
+ t.setSetupTask();
+ }
t.setConf(conf);
LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
t.setSkipRanges(failedRanges.getSkipRanges());
@@ -951,7 +964,7 @@
}
public long getMapInputSize() {
- if(isMapTask()) {
+ if(isMapTask() && !setup && !cleanup) {
return rawSplit.getDataLength();
} else {
return 0;
Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon Oct 6 23:59:44 2008
@@ -464,9 +464,9 @@
}
/**
- * Get the map task completion events
+ * Get the task completion events
*/
- public TaskCompletionEvent[] getMapTaskCompletionEvents(JobID id, int from,
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from,
int max)
throws IOException {
return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max);
Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Mon Oct 6 23:59:44 2008
@@ -471,8 +471,9 @@
}
TaskCompletionEvent[] prevEvents =
- mr.getMapTaskCompletionEvents(id, 0, numMaps);
- TaskReport[] prevReports = jobClient.getMapTaskReports(id);
+ mr.getTaskCompletionEvents(id, 0, numMaps);
+ TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
+ TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
ClusterStatus prevStatus = jobClient.getClusterStatus();
mr.stopJobTracker();
@@ -502,7 +503,7 @@
// Get the new jobtrackers events
TaskCompletionEvent[] jtEvents =
- mr.getMapTaskCompletionEvents(id, 0, 2 * numMaps);
+ mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
// Test if all the events that were recovered match exactly
testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
@@ -521,8 +522,10 @@
// Check the task reports
// The reports should match exactly if the attempts are same
- TaskReport[] afterReports = jobClient.getMapTaskReports(id);
- testTaskReports(prevReports, afterReports, numToMatch);
+ TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
+ TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
+ testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
+ testTaskReports(prevSetupReports, afterSetupReports, 1);
// Signal the reduce tasks
signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
@@ -829,4 +832,4 @@
public static void main(String[] args) throws IOException {
new TestJobTrackerRestart().testJobTrackerRestart();
}
-}
\ No newline at end of file
+}
Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Oct 6 23:59:44 2008
@@ -70,7 +70,10 @@
// test the task report fetchers
JobClient client = new JobClient(job);
JobID jobid = ret.job.getID();
- TaskReport[] reports = client.getMapTaskReports(jobid);
+ TaskReport[] reports;
+ reports = client.getSetupTaskReports(jobid);
+ assertEquals("number of setups", 2, reports.length);
+ reports = client.getMapTaskReports(jobid);
assertEquals("number of maps", 1, reports.length);
reports = client.getReduceTaskReports(jobid);
assertEquals("number of reduces", 1, reports.length);
Modified: hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp Mon Oct 6 23:59:44 2008
@@ -88,15 +88,15 @@
"</td></tr>\n");
}
- private void printCleanupTaskSummary(JspWriter out,
+ private void printJobLevelTaskSummary(JspWriter out,
String jobId,
+ String kind,
TaskInProgress[] tasks
) throws IOException {
int totalTasks = tasks.length;
int runningTasks = 0;
int finishedTasks = 0;
int killedTasks = 0;
- String kind = "cleanup";
for(int i=0; i < totalTasks; ++i) {
TaskInProgress task = tasks[i];
if (task.isComplete()) {
@@ -208,6 +208,9 @@
out.print("<b>Job Name:</b> " + profile.getJobName() + "<br>\n");
out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">"
+ profile.getJobFile() + "</a><br>\n");
+ out.print("<b>Job Setup:</b>");
+ printJobLevelTaskSummary(out, jobId, "setup", job.getSetupTasks());
+ out.print("<br>\n");
if (runState == JobStatus.RUNNING) {
out.print("<b>Status:</b> Running<br>\n");
out.print("<b>Started at:</b> " + new Date(job.getStartTime()) + "<br>\n");
@@ -238,7 +241,7 @@
}
}
out.print("<b>Job Cleanup:</b>");
- printCleanupTaskSummary(out, jobId, job.getCleanupTasks());
+ printJobLevelTaskSummary(out, jobId, "cleanup", job.getCleanupTasks());
out.print("<br>\n");
if (flakyTaskTrackers > 0) {
out.print("<b>Black-listed TaskTrackers:</b> " +
Modified: hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp Mon Oct 6 23:59:44 2008
@@ -42,6 +42,7 @@
int totalMaps = 0 ;
int totalReduces = 0;
int totalCleanups = 0;
+ int totalSetups = 0;
int numFailedMaps = 0;
int numKilledMaps = 0;
int numFailedReduces = 0 ;
@@ -49,6 +50,9 @@
int numFinishedCleanups = 0;
int numFailedCleanups = 0;
int numKilledCleanups = 0;
+ int numFinishedSetups = 0;
+ int numFailedSetups = 0;
+ int numKilledSetups = 0;
long mapStarted = 0 ;
long mapFinished = 0 ;
@@ -56,6 +60,8 @@
long reduceFinished = 0;
long cleanupStarted = 0;
long cleanupFinished = 0;
+ long setupStarted = 0;
+ long setupFinished = 0;
Map <String,String> allHosts = new TreeMap<String,String>();
for (JobHistory.Task task : tasks.values()) {
@@ -104,6 +110,21 @@
numFailedCleanups++;
} else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
numKilledCleanups++;
+ }
+ } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))) {
+ if (setupStarted==0||setupStarted > startTime) {
+ setupStarted = startTime ;
+ }
+ if (setupFinished < finishTime) {
+ setupFinished = finishTime;
+ }
+ totalSetups++;
+ if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+ numFinishedSetups++;
+ } else if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+ numFailedSetups++;
+ } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+ numKilledSetups++;
}
}
}
@@ -117,6 +138,19 @@
<td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
</tr>
<tr>
+<td>Setup</td>
+ <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=all">
+ <%=totalSetups%></a></td>
+ <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.SUCCESS %>">
+ <%=numFinishedSetups%></a></td>
+ <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.FAILED %>">
+ <%=numFailedSetups%></a></td>
+ <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.KILLED %>">
+ <%=numKilledSetups%></a></td>
+ <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupStarted, 0) %></td>
+ <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupFinished, setupStarted) %></td>
+</tr>
+<tr>
<td>Map</td>
<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.MAP.name() %>&status=all">
<%=totalMaps %></a></td>
Modified: hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp Mon Oct 6 23:59:44 2008
@@ -21,7 +21,7 @@
}
String type = request.getParameter("type");
String pagenum = request.getParameter("pagenum");
- TaskInProgress[] tasks;
+ TaskInProgress[] tasks = null;
String state = request.getParameter("state");
state = (state!=null) ? state : "all";
int pnum = Integer.parseInt(pagenum);
@@ -42,9 +42,12 @@
else if ("reduce".equals(type)) {
reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
tasks = (job != null) ? job.getReduceTasks() : null;
- } else {
+ } else if ("cleanup".equals(type)) {
reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
tasks = (job != null) ? job.getCleanupTasks() : null;
+ } else if ("setup".equals(type)) {
+ reports = (job != null) ? tracker.getSetupTaskReports(jobidObj) : null;
+ tasks = (job != null) ? job.getSetupTasks() : null;
}
%>
Modified: hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp Mon Oct 6 23:59:44 2008
@@ -69,9 +69,12 @@
}
TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
: null;
- boolean isCleanup = false;
+ boolean isCleanupOrSetup = false;
if (tipidObj != null) {
- isCleanup = job.getTaskInProgress(tipidObj).isCleanupTask();
+ isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+ if (!isCleanupOrSetup) {
+ isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+ }
}
%>
@@ -98,7 +101,7 @@
<table border=2 cellpadding="5" cellspacing="2">
<tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td>
<%
- if (!ts[0].getIsMap() && !isCleanup) {
+ if (!ts[0].getIsMap() && !isCleanupOrSetup) {
%>
<td>Shuffle Finished</td><td>Sort Finished</td>
<%
@@ -126,7 +129,7 @@
out.print("<td>"
+ StringUtils.getFormattedTimeWithDiff(dateFormat, status
.getStartTime(), 0) + "</td>");
- if (!ts[i].getIsMap() && !isCleanup) {
+ if (!ts[i].getIsMap() && !isCleanupOrSetup) {
out.print("<td>"
+ StringUtils.getFormattedTimeWithDiff(dateFormat, status
.getShuffleFinishTime(), status.getStartTime()) + "</td>");