You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/12 06:55:32 UTC
svn commit: r725914 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/webapps/job/
Author: ddas
Date: Thu Dec 11 21:55:31 2008
New Revision: 725914
URL: http://svn.apache.org/viewvc?rev=725914&view=rev
Log:
HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker names. Also adds commands to display running/completed task attempt IDs. Contributed by Devaraj Das.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TIPStatus.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java
hadoop/core/trunk/src/webapps/job/jobtasks.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 11 21:55:31 2008
@@ -224,6 +224,9 @@
HADOOP-4728. Add a test exercising different namenode configurations.
(Boris Shkolnik via cdouglas)
+ HADOOP-4807. Adds JobClient commands to get the active/blacklisted tracker names.
+ Also adds commands to display running/completed task attempt IDs. (ddas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Thu Dec 11 21:55:31 2008
@@ -21,7 +21,10 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -34,6 +37,9 @@
* Size of the cluster.
* </li>
* <li>
+ * Name of the trackers.
+ * </li>
+ * <li>
* Task capacity of the cluster.
* </li>
* <li>
@@ -51,8 +57,10 @@
*/
public class ClusterStatus implements Writable {
- private int task_trackers;
- private int blacklisted_trackers;
+ private int numActiveTrackers;
+ private Collection<String> activeTrackers = new ArrayList<String>();
+ private Collection<String> blacklistedTrackers = new ArrayList<String>();
+ private int numBlacklistedTrackers;
private int map_tasks;
private int reduce_tasks;
private int max_map_tasks;
@@ -69,9 +77,12 @@
* @param trackers no. of tasktrackers in the cluster
* @param maps no. of currently running map-tasks in the cluster
* @param reduces no. of currently running reduce-tasks in the cluster
- * @param max the maximum no. of tasks in the cluster
+ * @param maxMaps the maximum no. of map tasks in the cluster
+ * @param maxReduces the maximum no. of reduce tasks in the cluster
* @param state the {@link JobTracker.State} of the <code>JobTracker</code>
+ * @deprecated
*/
+ @Deprecated
ClusterStatus(int trackers, int maps, int reduces, int maxMaps,
int maxReduces, JobTracker.State state) {
this(trackers, 0, maps, reduces, maxMaps, maxReduces, state);
@@ -90,8 +101,8 @@
*/
ClusterStatus(int trackers, int blacklists, int maps, int reduces,
int maxMaps, int maxReduces, JobTracker.State state) {
- task_trackers = trackers;
- blacklisted_trackers = blacklists;
+ numActiveTrackers = trackers;
+ numBlacklistedTrackers = blacklists;
map_tasks = maps;
reduce_tasks = reduces;
max_map_tasks = maxMaps;
@@ -102,12 +113,52 @@
}
/**
+ * Construct a new cluster status.
+ *
+ * @param activeTrackers active tasktrackers in the cluster
+ * @param blacklistedTrackers blacklisted tasktrackers in the cluster
+ * @param maps no. of currently running map-tasks in the cluster
+ * @param reduces no. of currently running reduce-tasks in the cluster
+ * @param maxMaps the maximum no. of map tasks in the cluster
+ * @param maxReduces the maximum no. of reduce tasks in the cluster
+ * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
+ */
+ ClusterStatus(Collection<String> activeTrackers,
+ Collection<String> blacklistedTrackers,
+ int maps, int reduces, int maxMaps, int maxReduces,
+ JobTracker.State state) {
+ this(activeTrackers.size(), blacklistedTrackers.size(), maps, reduces,
+ maxMaps, maxReduces, state);
+ this.activeTrackers = activeTrackers;
+ this.blacklistedTrackers = blacklistedTrackers;
+ }
+
+
+ /**
* Get the number of task trackers in the cluster.
*
* @return the number of task trackers in the cluster.
*/
public int getTaskTrackers() {
- return task_trackers;
+ return numActiveTrackers;
+ }
+
+ /**
+ * Get the names of task trackers in the cluster.
+ *
+ * @return the active task trackers in the cluster.
+ */
+ public Collection<String> getActiveTrackerNames() {
+ return activeTrackers;
+ }
+
+ /**
+ * Get the names of task trackers in the cluster.
+ *
+ * @return the blacklisted task trackers in the cluster.
+ */
+ public Collection<String> getBlacklistedTrackerNames() {
+ return blacklistedTrackers;
}
/**
@@ -116,7 +167,7 @@
* @return the number of blacklisted task trackers in the cluster.
*/
public int getBlacklistedTrackers() {
- return blacklisted_trackers;
+ return numBlacklistedTrackers;
}
/**
@@ -184,8 +235,26 @@
}
public void write(DataOutput out) throws IOException {
- out.writeInt(task_trackers);
- out.writeInt(blacklisted_trackers);
+ if (activeTrackers.size() == 0) {
+ out.writeInt(numActiveTrackers);
+ out.writeInt(0);
+ } else {
+ out.writeInt(activeTrackers.size());
+ out.writeInt(activeTrackers.size());
+ for (String tracker : activeTrackers) {
+ Text.writeString(out, tracker);
+ }
+ }
+ if (blacklistedTrackers.size() == 0) {
+ out.writeInt(numBlacklistedTrackers);
+ out.writeInt(0);
+ } else {
+ out.writeInt(blacklistedTrackers.size());
+ out.writeInt(blacklistedTrackers.size());
+ for (String tracker : blacklistedTrackers) {
+ Text.writeString(out, tracker);
+ }
+ }
out.writeInt(map_tasks);
out.writeInt(reduce_tasks);
out.writeInt(max_map_tasks);
@@ -196,8 +265,22 @@
}
public void readFields(DataInput in) throws IOException {
- task_trackers = in.readInt();
- blacklisted_trackers = in.readInt();
+ numActiveTrackers = in.readInt();
+ int numTrackerNames = in.readInt();
+ if (numTrackerNames > 0) {
+ for (int i = 0; i < numTrackerNames; i++) {
+ String name = Text.readString(in);
+ activeTrackers.add(name);
+ }
+ }
+ numBlacklistedTrackers = in.readInt();
+ numTrackerNames = in.readInt();
+ if (numTrackerNames > 0) {
+ for (int i = 0; i < numTrackerNames; i++) {
+ String name = Text.readString(in);
+ blacklistedTrackers.add(name);
+ }
+ }
map_tasks = in.readInt();
reduce_tasks = in.readInt();
max_map_tasks = in.readInt();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Dec 11 21:55:31 2008
@@ -36,6 +36,7 @@
import java.net.URLConnection;
import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.Random;
@@ -1011,6 +1012,48 @@
}
/**
+ * Display the information about a job's tasks, of a particular type and
+ * in a particular state
+ *
+ * @param jobId the ID of the job
+ * @param type the type of the task (map/reduce/setup/cleanup)
+ * @param state the state of the task
+ * (pending/running/completed/failed/killed)
+ */
+ public void displayTasks(JobID jobId, String type, String state)
+ throws IOException {
+ TaskReport[] reports = new TaskReport[0];
+ if (type.equals("map")) {
+ reports = getMapTaskReports(jobId);
+ } else if (type.equals("reduce")) {
+ reports = getReduceTaskReports(jobId);
+ } else if (type.equals("setup")) {
+ reports = getSetupTaskReports(jobId);
+ } else if (type.equals("cleanup")) {
+ reports = getCleanupTaskReports(jobId);
+ }
+ for (TaskReport report : reports) {
+ TIPStatus status = report.getCurrentStatus();
+ if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
+ (state.equals("running") && status ==TIPStatus.RUNNING) ||
+ (state.equals("completed") && status == TIPStatus.COMPLETE) ||
+ (state.equals("failed") && status == TIPStatus.FAILED) ||
+ (state.equals("killed") && status == TIPStatus.KILLED)) {
+ printTaskAttempts(report);
+ }
+ }
+ }
+ private void printTaskAttempts(TaskReport report) {
+ if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
+ System.out.println(report.getSuccessfulTaskAttempt());
+ } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
+ for (TaskAttemptID t :
+ report.getRunningTaskAttempts()) {
+ System.out.println(t);
+ }
+ }
+ }
+ /**
* Get status information about the Map-Reduce cluster.
*
* @return the status information about the Map-Reduce cluster as an object
@@ -1018,7 +1061,7 @@
* @throws IOException
*/
public ClusterStatus getClusterStatus() throws IOException {
- return jobSubmitClient.getClusterStatus();
+ return jobSubmitClient.getClusterStatus(false);
}
@@ -1295,6 +1338,8 @@
private void displayUsage(String cmd) {
String prefix = "Usage: JobClient ";
String jobPriorityValues = getJobPriorityNames();
+ String taskTypes = "map, reduce, setup, cleanup";
+ String taskStates = "running, completed";
if("-submit".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-file>]");
} else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
@@ -1313,6 +1358,15 @@
System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
"Valid values for priorities are: "
+ jobPriorityValues);
+ } else if ("-list-active-trackers".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + "]");
+ } else if ("-list-blacklisted-trackers".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd + "]");
+ } else if ("-list-attempt-ids".equals(cmd)) {
+ System.err.println(prefix + "[" + cmd +
+ " <job-id> <task-type> <task-state>]. " +
+ "Valid values for <task-type> are " + taskTypes + ". " +
+ "Valid values for <task-state> are " + taskStates);
} else {
System.err.printf(prefix + "<command> <args>\n");
System.err.printf("\t[-submit <job-file>]\n");
@@ -1325,6 +1379,10 @@
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
System.err.printf("\t[-history <jobOutputDir>]\n");
System.err.printf("\t[-list [all]]\n");
+ System.err.printf("\t[-list-active-trackers]\n");
+ System.err.printf("\t[-list-blacklisted-trackers]\n");
+ System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
+ "<task-state>]\n");
System.err.printf("\t[-kill-task <task-id>]\n");
System.err.printf("\t[-fail-task <task-id>]\n\n");
ToolRunner.printGenericCommandUsage(System.out);
@@ -1346,6 +1404,8 @@
String counterGroupName = null;
String counterName = null;
String newPriority = null;
+ String taskType = null;
+ String taskState = null;
int fromEvent = 0;
int nEvents = 0;
boolean getStatus = false;
@@ -1356,6 +1416,9 @@
boolean viewAllHistory = false;
boolean listJobs = false;
boolean listAllJobs = false;
+ boolean listActiveTrackers = false;
+ boolean listBlacklistedTrackers = false;
+ boolean displayTasks = false;
boolean killTask = false;
boolean failTask = false;
boolean setJobPriority = false;
@@ -1448,6 +1511,27 @@
}
failTask = true;
taskid = argv[1];
+ } else if ("-list-active-trackers".equals(cmd)) {
+ if (argv.length != 1) {
+ displayUsage(cmd);
+ return exitCode;
+ }
+ listActiveTrackers = true;
+ } else if ("-list-blacklisted-trackers".equals(cmd)) {
+ if (argv.length != 1) {
+ displayUsage(cmd);
+ return exitCode;
+ }
+ listBlacklistedTrackers = true;
+ } else if ("-list-attempt-ids".equals(cmd)) {
+ if (argv.length != 4) {
+ displayUsage(cmd);
+ return exitCode;
+ }
+ jobid = argv[1];
+ taskType = argv[2];
+ taskState = argv[3];
+ displayTasks = true;
} else {
displayUsage(cmd);
return exitCode;
@@ -1517,8 +1601,16 @@
listJobs();
exitCode = 0;
} else if (listAllJobs) {
- listAllJobs();
- exitCode = 0;
+ listAllJobs();
+ exitCode = 0;
+ } else if (listActiveTrackers) {
+ listActiveTrackers();
+ exitCode = 0;
+ } else if (listBlacklistedTrackers) {
+ listBlacklistedTrackers();
+ exitCode = 0;
+ } else if (displayTasks) {
+ displayTasks(JobID.forName(jobid), taskType, taskState);
} else if(killTask) {
if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), false)) {
System.out.println("Killed task " + taskid);
@@ -1594,6 +1686,28 @@
"\tFailed : 3\tPrep : 4\n");
displayJobList(jobs);
}
+
+ /**
+ * Display the list of active trackers
+ */
+ private void listActiveTrackers() throws IOException {
+ ClusterStatus c = jobSubmitClient.getClusterStatus(true);
+ Collection<String> trackers = c.getActiveTrackerNames();
+ for (String trackerName : trackers) {
+ System.out.println(trackerName);
+ }
+ }
+
+ /**
+ * Display the list of blacklisted trackers
+ */
+ private void listBlacklistedTrackers() throws IOException {
+ ClusterStatus c = jobSubmitClient.getClusterStatus(true);
+ Collection<String> trackers = c.getBlacklistedTrackerNames();
+ for (String trackerName : trackers) {
+ System.out.println(trackerName);
+ }
+ }
void displayJobList(JobStatus[] jobs) {
System.out.printf("JobId\tState\tStartTime\tUserName\tPriority\tSchedulingInfo\n");
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Thu Dec 11 21:55:31 2008
@@ -52,9 +52,12 @@
* Version 17: getClusterStatus returns the amount of memory used by
* the server. HADOOP-4435
* Version 18: Added blacklisted trackers to the ClusterStatus
- * for HADOOP-4305
+ * for HADOOP-4305
+ * Version 19: Modified TaskReport to have TIP status and modified the
+ * method getClusterStatus() to take a boolean argument
+ * for HADOOP-4807
*/
- public static final long versionID = 18L;
+ public static final long versionID = 19L;
/**
* Allocate a name for the job.
@@ -72,9 +75,11 @@
/**
* Get the current status of the cluster
+ * @param detailed if true then report tracker names as well
* @return summary of the state of the cluster
*/
- public ClusterStatus getClusterStatus() throws IOException;
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
+
/**
* Kill the indicated job
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Dec 11 21:55:31 2008
@@ -1996,6 +1996,32 @@
}
/**
+ * Get the active and blacklisted task tracker names in the cluster. The first
+ * element in the returned list contains the list of active tracker names.
+ * The second element in the returned list contains the list of blacklisted
+ * tracker names.
+ */
+ public List<List<String>> taskTrackerNames() {
+ List<String> activeTrackers =
+ new ArrayList<String>();
+ List<String> blacklistedTrackers =
+ new ArrayList<String>();
+ synchronized (taskTrackers) {
+ for (TaskTrackerStatus status : taskTrackers.values()) {
+ if (!faultyTrackers.isBlacklisted(status.getHost())) {
+ activeTrackers.add(status.getTrackerName());
+ } else {
+ blacklistedTrackers.add(status.getTrackerName());
+ }
+ }
+ }
+ List<List<String>> result = new ArrayList<List<String>>(2);
+ result.add(activeTrackers);
+ result.add(blacklistedTrackers);
+ return result;
+ }
+
+ /**
* Get the blacklisted task tracker statuses in the cluster
*
* @return {@link Collection} of blacklisted {@link TaskTrackerStatus}
@@ -2652,16 +2678,33 @@
}
}
+ /**@deprecated use {@link #getClusterStatus(boolean)}*/
+ @Deprecated
public synchronized ClusterStatus getClusterStatus() {
+ return getClusterStatus(false);
+ }
+
+ public synchronized ClusterStatus getClusterStatus(boolean detailed) {
synchronized (taskTrackers) {
- return new ClusterStatus(taskTrackers.size() -
- getBlacklistedTrackerCount(),
- getBlacklistedTrackerCount(),
- totalMaps,
- totalReduces,
- totalMapTaskCapacity,
- totalReduceTaskCapacity,
- state);
+ if (detailed) {
+ List<List<String>> trackerNames = taskTrackerNames();
+ return new ClusterStatus(trackerNames.get(0),
+ trackerNames.get(1),
+ totalMaps,
+ totalReduces,
+ totalMapTaskCapacity,
+ totalReduceTaskCapacity,
+ state);
+ } else {
+ return new ClusterStatus(taskTrackers.size() -
+ getBlacklistedTrackerCount(),
+ getBlacklistedTrackerCount(),
+ totalMaps,
+ totalReduces,
+ totalMapTaskCapacity,
+ totalReduceTaskCapacity,
+ state);
+ }
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Thu Dec 11 21:55:31 2008
@@ -385,8 +385,8 @@
return fs.getUri().toString();
}
- public ClusterStatus getClusterStatus() {
- return new ClusterStatus(1, map_tasks, reduce_tasks, 1, 1,
+ public ClusterStatus getClusterStatus(boolean detailed) {
+ return new ClusterStatus(1, 0, map_tasks, reduce_tasks, 1, 1,
JobTracker.State.RUNNING);
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TIPStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TIPStatus.java?rev=725914&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TIPStatus.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TIPStatus.java Thu Dec 11 21:55:31 2008
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/** The states of a {@link TaskInProgress} as seen by the JobTracker.
+ */
+public enum TIPStatus {
+ PENDING, RUNNING, COMPLETE, KILLED, FAILED;
+}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Thu Dec 11 21:55:31 2008
@@ -431,11 +431,28 @@
for (List<String> l : taskDiagnosticData.values()) {
diagnostics.addAll(l);
}
+ TIPStatus currentStatus = null;
+ if (isRunning() && !isComplete()) {
+ currentStatus = TIPStatus.RUNNING;
+ } else if (isComplete()) {
+ currentStatus = TIPStatus.COMPLETE;
+ } else if (wasKilled()) {
+ currentStatus = TIPStatus.KILLED;
+ } else if (isFailed()) {
+ currentStatus = TIPStatus.FAILED;
+ } else if (!(isComplete() || isRunning() || wasKilled())) {
+ currentStatus = TIPStatus.PENDING;
+ }
+
TaskReport report = new TaskReport
(getTIPId(), (float)progress, state,
diagnostics.toArray(new String[diagnostics.size()]),
- execStartTime, execFinishTime, counters);
-
+ currentStatus, execStartTime, execFinishTime, counters);
+ if (currentStatus == TIPStatus.RUNNING) {
+ report.setRunningTaskAttempts(activeTasks.keySet());
+ } else if (currentStatus == TIPStatus.COMPLETE) {
+ report.setSuccessfulAttempt(getSuccessfulTaskid());
+ }
return report;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskReport.java Thu Dec 11 21:55:31 2008
@@ -20,7 +20,9 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -35,18 +37,54 @@
private long startTime;
private long finishTime;
private Counters counters;
-
+ private TIPStatus currentStatus;
+
+ private Collection<TaskAttemptID> runningAttempts =
+ new ArrayList<TaskAttemptID>();
+ private TaskAttemptID successfulAttempt = new TaskAttemptID();
public TaskReport() {
taskid = new TaskID();
}
-
+
+ /**
+ * Creates a new TaskReport object
+ * @param taskid
+ * @param progress
+ * @param state
+ * @param diagnostics
+ * @param startTime
+ * @param finishTime
+ * @param counters
+ * @deprecated
+ */
+ @Deprecated
TaskReport(TaskID taskid, float progress, String state,
- String[] diagnostics, long startTime, long finishTime,
+ String[] diagnostics, long startTime, long finishTime,
+ Counters counters) {
+ this(taskid, progress, state, diagnostics, null, startTime, finishTime,
+ counters);
+ }
+
+ /**
+ * Creates a new TaskReport object
+ * @param taskid
+ * @param progress
+ * @param state
+ * @param diagnostics
+ * @param currentStatus
+ * @param startTime
+ * @param finishTime
+ * @param counters
+ */
+ TaskReport(TaskID taskid, float progress, String state,
+ String[] diagnostics, TIPStatus currentStatus,
+ long startTime, long finishTime,
Counters counters) {
this.taskid = taskid;
this.progress = progress;
this.state = state;
this.diagnostics = diagnostics;
+ this.currentStatus = currentStatus;
this.startTime = startTime;
this.finishTime = finishTime;
this.counters = counters;
@@ -65,6 +103,10 @@
public String[] getDiagnostics() { return diagnostics; }
/** A table of counters. */
public Counters getCounters() { return counters; }
+ /** The current status */
+ public TIPStatus getCurrentStatus() {
+ return currentStatus;
+ }
/**
* Get finish time of task.
@@ -97,6 +139,33 @@
this.startTime = startTime;
}
+ /**
+ * set successful attempt ID of the task.
+ */
+ public void setSuccessfulAttempt(TaskAttemptID t) {
+ successfulAttempt = t;
+ }
+ /**
+ * Get the attempt ID that took this task to completion
+ */
+ public TaskAttemptID getSuccessfulTaskAttempt() {
+ return successfulAttempt;
+ }
+ /**
+ * set running attempt(s) of the task.
+ */
+ public void setRunningTaskAttempts(
+ Collection<TaskAttemptID> runningAttempts) {
+ this.runningAttempts = runningAttempts;
+ }
+ /**
+ * Get the running task attempt IDs for this task
+ */
+ public Collection<TaskAttemptID> getRunningTaskAttempts() {
+ return runningAttempts;
+ }
+
+
@Override
public boolean equals(Object o) {
if(o == null)
@@ -132,6 +201,17 @@
out.writeLong(finishTime);
WritableUtils.writeStringArray(out, diagnostics);
counters.write(out);
+ WritableUtils.writeEnum(out, currentStatus);
+ if (currentStatus == TIPStatus.RUNNING) {
+ WritableUtils.writeVInt(out, runningAttempts.size());
+ TaskAttemptID t[] = new TaskAttemptID[0];
+ t = runningAttempts.toArray(t);
+ for (int i = 0; i < t.length; i++) {
+ t[i].write(out);
+ }
+ } else if (currentStatus == TIPStatus.COMPLETE) {
+ successfulAttempt.write(out);
+ }
}
public void readFields(DataInput in) throws IOException {
@@ -144,5 +224,16 @@
diagnostics = WritableUtils.readStringArray(in);
counters = new Counters();
counters.readFields(in);
+ currentStatus = WritableUtils.readEnum(in, TIPStatus.class);
+ if (currentStatus == TIPStatus.RUNNING) {
+ int num = WritableUtils.readVInt(in);
+ for (int i = 0; i < num; i++) {
+ TaskAttemptID t = new TaskAttemptID();
+ t.readFields(in);
+ runningAttempts.add(t);
+ }
+ } else if (currentStatus == TIPStatus.COMPLETE) {
+ successfulAttempt.readFields(in);
+ }
}
}
Modified: hadoop/core/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtasks.jsp?rev=725914&r1=725913&r2=725914&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobtasks.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobtasks.jsp Thu Dec 11 21:55:31 2008
@@ -21,7 +21,6 @@
}
String type = request.getParameter("type");
String pagenum = request.getParameter("pagenum");
- TaskInProgress[] tasks = null;
String state = request.getParameter("state");
state = (state!=null) ? state : "all";
int pnum = Integer.parseInt(pagenum);
@@ -35,19 +34,14 @@
int start_index = (pnum - 1) * numperpage;
int end_index = start_index + numperpage;
int report_len = 0;
- if ("map".equals(type)){
- reports = (job != null) ? tracker.getMapTaskReports(jobidObj) : null;
- tasks = (job != null) ? job.getMapTasks() : null;
- }
- else if ("reduce".equals(type)) {
+ if ("map".equals(type)) {
+ reports = (job != null) ? tracker.getMapTaskReports(jobidObj) : null;
+ } else if ("reduce".equals(type)) {
reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
- tasks = (job != null) ? job.getReduceTasks() : null;
} else if ("cleanup".equals(type)) {
reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
- tasks = (job != null) ? job.getCleanupTasks() : null;
} else if ("setup".equals(type)) {
reports = (job != null) ? tracker.getSetupTaskReports(jobidObj) : null;
- tasks = (job != null) ? job.getSetupTasks() : null;
}
%>
@@ -67,27 +61,18 @@
}
// Filtering the reports if some filter is specified
if (!"all".equals(state)) {
- List<TaskID> filteredReportsTaskIds = new ArrayList<TaskID>();
List<TaskReport> filteredReports = new ArrayList<TaskReport>();
- for (int i = 0; i < tasks.length; ++i) {
- if (("completed".equals(state) && tasks[i].isComplete())
- || ("running".equals(state) && tasks[i].isRunning()
- && !tasks[i].isComplete())
- || ("killed".equals(state) && tasks[i].wasKilled())
- || ("pending".equals(state) && !(tasks[i].isComplete()
- || tasks[i].isRunning()
- || tasks[i].wasKilled()))) {
- filteredReportsTaskIds.add(tasks[i].getTIPId());
- }
- }
- for (int i = 0 ; i < reports.length; ++i) {
- if (filteredReportsTaskIds.contains(reports[i].getTaskID())) {
+ for (int i = 0; i < reports.length; ++i) {
+ if (("completed".equals(state) && reports[i].getCurrentStatus() == TIPStatus.COMPLETE)
+ || ("running".equals(state) && reports[i].getCurrentStatus() == TIPStatus.RUNNING)
+ || ("killed".equals(state) && reports[i].getCurrentStatus() == TIPStatus.KILLED)
+ || ("pending".equals(state) && reports[i].getCurrentStatus() == TIPStatus.PENDING)) {
filteredReports.add(reports[i]);
}
}
- tasks = null; // free the task memory
// using filtered reports instead of all the reports
reports = filteredReports.toArray(new TaskReport[0]);
+ filteredReports = null;
}
report_len = reports.length;
@@ -107,8 +92,8 @@
for (int i = start_index ; i < end_index; i++) {
TaskReport report = reports[i];
out.print("<tr><td><a href=\"taskdetails.jsp?jobid=" + jobid +
- "&tipid=" + report.getTaskId() + "\">" +
- report.getTaskId() + "</a></td>");
+ "&tipid=" + report.getTaskID() + "\">" +
+ report.getTaskID() + "</a></td>");
out.print("<td>" + StringUtils.formatPercent(report.getProgress(),2) +
ServletUtil.percentageGraph(report.getProgress() * 100f, 80) + "</td>");
out.print("<td>" + report.getState() + "<br/></td>");
@@ -123,7 +108,7 @@
out.println("</pre><br/></td>");
out.println("<td>" +
"<a href=\"taskstats.jsp?jobid=" + jobid +
- "&tipid=" + report.getTaskId() +
+ "&tipid=" + report.getTaskID() +
"\">" + report.getCounters().size() +
"</a></td></tr>");
}