You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/03/02 20:56:24 UTC
svn commit: r513917 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/ src/webapps/job/
Author: cutting
Date: Fri Mar 2 11:56:22 2007
New Revision: 513917
URL: http://svn.apache.org/viewvc?view=rev&rev=513917
Log:
HADOOP-1041. Optimize mapred counter implementation. Contributed by David Bowen.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp
lucene/hadoop/trunk/src/webapps/job/taskstats.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Mar 2 11:56:22 2007
@@ -179,6 +179,10 @@
54. HADOOP-1046. Clean up tmp from partially received stale block files. (ab)
+55. HADOOP-1041. Optimize mapred counter implementation. Also group
+ counters by their declaring Enum. (David Bowen via cutting)
+
+
Release 0.11.2 - 2007-02-16
1. HADOOP-1009. Fix an infinite loop in the HDFS namenode.
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Fri Mar 2 11:56:22 2007
@@ -216,6 +216,14 @@
<compilerarg line="${javac.args}" />
<classpath refid="classpath"/>
</javac>
+
+ <copy todir="${build.classes}">
+ <fileset
+ dir="${src.dir}"
+ includes="**/*.properties"
+ />
+ </copy>
+
</target>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Fri Mar 2 11:56:22 2007
@@ -21,11 +21,16 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.commons.logging.Log;
+import org.apache.commons.logging.*;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
@@ -34,34 +39,231 @@
*/
public class Counters implements Writable {
- private Map<String,Long> counters = new TreeMap<String,Long>();
-
+ //private static Log log = LogFactory.getLog("Counters.class");
+
+ /**
+ * A counter record, comprising its name and value.
+ */
+ private static class CounterRec {
+
+ public String name;
+ public long value;
+
+ public CounterRec(String name, long value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ } // end class CounterRec
+
+ /**
+ * Represents a group of counters, comprising the counters from a particular
+ * counter enum class.
+ *
+ * This class handles localization of the class name and the counter names.
+ */
+ public static class Group {
+
+ // The group name is the fully qualified enum class name.
+ private String groupName;
+
+ // Optional ResourceBundle for localization of group and counter names.
+ private ResourceBundle bundle = null;
+
+ // Maps counter names to their current values. Note that the iteration
+ // order of this Map is the same as the ordering of the Enum class in which
+ // these counter names were defined.
+ private Map<String,Long> groupCounters = new LinkedHashMap<String,Long>();
+
+
+ Group(String groupName, Collection<CounterRec> counters) {
+ this.groupName = groupName;
+ try {
+ bundle = getResourceBundle(groupName);
+ }
+ catch (MissingResourceException neverMind) {
+ }
+
+ for (CounterRec counter : counters) {
+ groupCounters.put(counter.name, counter.value);
+ }
+ }
+
+ /**
+ * Returns the specified resource bundle, or throws an exception.
+ * @throws MissingResourceException if the bundle isn't found
+ */
+ private static ResourceBundle getResourceBundle(String enumClassName) {
+ String bundleName = enumClassName.replace('$','_');
+ return ResourceBundle.getBundle(bundleName);
+ }
+
+ /**
+ * Returns raw name of the group. This is the name of the enum class
+ * for this group of counters.
+ */
+ public String getName() {
+ return groupName;
+ }
+
+ /**
+ * Returns localized name of the group. This is the same as getName() by
+ * default, but different if an appropriate ResourceBundle is found.
+ */
+ public String getDisplayName() {
+ return localize("CounterGroupName", groupName);
+ }
+
+ /**
+ * Returns localized name of the specified counter.
+ */
+ public String getDisplayName(String counter) {
+ return localize(counter + ".name", counter);
+ }
+
+ /**
+ * Returns the counters for this group, with their names localized.
+ */
+ public Collection<String> getCounterNames() {
+ return groupCounters.keySet();
+ }
+
+ /**
+ * Returns the value of the specified counter, or 0 if the counter does
+ * not exist.
+ */
+ public long getCounter(String counter) {
+ Long result = groupCounters.get(counter);
+ return (result == null ? 0L : result);
+ }
+
+ /**
+ * Returns the number of counters in this group.
+ */
+ public int size() {
+ return groupCounters.size();
+ }
+
+ /**
+ * Looks up key in the ResourceBundle and returns the corresponding value.
+ * If the bundle or the key doesn't exist, returns the default value.
+ */
+ private String localize(String key, String defaultValue) {
+ String result = defaultValue;
+ if (bundle != null) {
+ try {
+ result = bundle.getString(key);
+ }
+ catch (MissingResourceException mre) {
+ }
+ }
+ return result;
+ }
+
+
+ } // end class Group
+
+
+ // Map from group name (enum class name) to map of int (enum ordinal) to
+ // counter record (name-value pair).
+ private Map<String,Map<Integer,CounterRec>> counters =
+ new TreeMap<String,Map<Integer,CounterRec>>();
+
/**
- * Returns the names of all counters.
+ * Returns the names of all counter classes.
* @return Set of counter names.
*/
- public synchronized Set<String> getCounterNames() {
+ public synchronized Collection<String> getGroupNames() {
return counters.keySet();
}
/**
- * Returns the value of the named counter, or 0 if counter doesn't exist.
- * @param name name of a counter
- * @return value of the counter
+ * Returns the named counter group, or an empty group if there is none
+ * with the specified name.
*/
- public synchronized long getCounter(String name) {
- Long result = counters.get(name);
- return (result == null ? 0L : result);
+ public synchronized Group getGroup(String groupName) {
+ Map<Integer,CounterRec> counterMap = counters.get(groupName);
+ Collection<CounterRec> groupCounters;
+ if (counterMap == null) {
+ groupCounters = Collections.emptySet();
+ }
+ else {
+ groupCounters = counterMap.values();
+ }
+ return new Group(groupName, groupCounters);
}
/**
- * Increments the named counter by the specified amount, creating it if
+ * Increments the specified counter by the specified amount, creating it if
* it didn't already exist.
- * @param name of a counter
+ * @param key identifies a counter
* @param amount amount by which counter is to be incremented
*/
- public synchronized void incrCounter(String name, long amount) {
- counters.put(name, amount + getCounter(name));
+ public synchronized void incrCounter(Enum key, long amount) {
+ int ordinal = key.ordinal();
+ String counterName = key.toString();
+ String groupName = key.getDeclaringClass().getName();
+ Map<Integer,CounterRec> counterMap = getCounterMap(groupName);
+ CounterRec counter = getCounter(counterMap, counterName, ordinal);
+ counter.value += amount;
+ }
+
+ /**
+ * Returns current value of the specified counter, or 0 if the counter
+ * does not exist.
+ */
+ public synchronized long getCounter(Enum key) {
+ long result = 0L;
+ String groupName = key.getDeclaringClass().getName();
+ Map<Integer,CounterRec> counterMap = counters.get(groupName);
+ if (counterMap != null) {
+ int ordinal = key.ordinal();
+ String name = key.toString();
+ CounterRec counter = counterMap.get(ordinal);
+ if (counter != null && counter.name.equals(name)) {
+ result = counter.value;
+ }
+ else {
+ // ordinal lookup failed, but maybe there is a matching name; this
+ // could happen if e.g. a client has a different version of the Enum class.
+ for (CounterRec ctr : counterMap.values()) {
+ if (ctr.name.equals(name)) {
+ result = ctr.value;
+ break;
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns the counters for the specified counter class. The counters are
+ * returned as a map from ordinal number, so that their ordering in the
+ * enum class declaration is preserved.
+ */
+ private Map<Integer,CounterRec> getCounterMap(String groupName) {
+ Map<Integer,CounterRec> map = counters.get(groupName);
+ if (map == null) {
+ map = new TreeMap<Integer,CounterRec>();
+ counters.put(groupName, map);
+ }
+ return map;
+ }
+
+ /**
+ * Returns the counter record with the specified name and ordinal by
+ * finding or creating it in the specified counterMap.
+ */
+ private CounterRec getCounter(Map<Integer,CounterRec> counterMap,
+ String counterName, int ordinal)
+ {
+ CounterRec result = counterMap.get(ordinal);
+ if (result == null) {
+ result = new CounterRec(counterName, 0L);
+ counterMap.put(ordinal, result);
+ }
+ return result;
}
/**
@@ -70,34 +272,80 @@
* @param other the other Counters instance
*/
public synchronized void incrAllCounters(Counters other) {
- for (String name : other.getCounterNames()) {
- incrCounter(name, other.getCounter(name));
+ for (String groupName : other.counters.keySet()) {
+ Map<Integer,CounterRec> otherCounters = other.counters.get(groupName);
+ Map<Integer,CounterRec> myCounters = getCounterMap(groupName);
+ for (int i : otherCounters.keySet()) {
+ CounterRec otherCounter = otherCounters.get(i);
+ CounterRec counter = getCounter(myCounters, otherCounter.name, i);
+ counter.value += otherCounter.value;
+ }
}
}
+
+ /**
+ * Convenience method for computing the sum of two sets of counters.
+ */
+ public static Counters sum(Counters a, Counters b) {
+ Counters counters = new Counters();
+ counters.incrAllCounters(a);
+ counters.incrAllCounters(b);
+ return counters;
+ }
/**
- * Returns the number of counters.
+ * Returns the total number of counters, by summing the number of counters
+ * in each group.
*/
- public synchronized int size() {
- return counters.size();
+ public synchronized int size() {
+ int result = 0;
+ for (String groupName : counters.keySet()) {
+ result += counters.get(groupName).size();
+ }
+ return result;
}
- // Writable
+ // Writable. The external format is:
+ //
+ // #groups group*
+ //
+ // i.e. the number of groups followed by 0 or more groups, where each
+ // group is of the form:
+ //
+ // groupName #counters counter*
+ //
+ // where each counter is of the form:
+ //
+ // ordinal name value
+ //
public synchronized void write(DataOutput out) throws IOException {
out.writeInt(counters.size());
- for (String name : counters.keySet()) {
- UTF8.writeString(out, name);
- out.writeLong(counters.get(name));
+ for (String groupName : counters.keySet()) {
+ UTF8.writeString(out, groupName);
+ Map<Integer,CounterRec> map = counters.get(groupName);
+ out.writeInt(map.size());
+ for (Integer ordinal : map.keySet()) {
+ CounterRec counter = map.get(ordinal);
+ out.writeInt(ordinal);
+ UTF8.writeString(out, counter.name);
+ out.writeLong(counter.value);
+ }
}
}
public synchronized void readFields(DataInput in) throws IOException {
- int n = in.readInt();
- while (n-- > 0) {
- String name = UTF8.readString(in);
- long value = in.readLong();
- counters.put(name, value);
+ int numClasses = in.readInt();
+ while (numClasses-- > 0) {
+ String groupName = UTF8.readString(in);
+ Map<Integer,CounterRec> counters = getCounterMap(groupName);
+ int numCounters = in.readInt();
+ while (numCounters-- > 0) {
+ int index = in.readInt();
+ String counterName = UTF8.readString(in);
+ long value = in.readLong();
+ counters.put(index, new CounterRec(counterName,value));
+ }
}
}
@@ -106,10 +354,17 @@
* @param log The log to use.
*/
public void log(Log log) {
- log.info("Counters: " + getCounterNames().size());
- for (String counterName : getCounterNames()) {
- log.info(" " + counterName + "=" + getCounter(counterName));
+ log.info("Counters: " + size());
+ Collection<String> groupNames = getGroupNames();
+ for (String groupName : groupNames) {
+ Group group = getGroup(groupName);
+ log.info(" " + group.getDisplayName());
+ for (String counterName : group.getCounterNames()) {
+ log.info(" " + group.getDisplayName(counterName) + "=" +
+ group.getCounter(counterName));
+ }
}
}
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Mar 2 11:56:22 2007
@@ -176,8 +176,11 @@
"reduce() completion: " + status.reduceProgress();
}
- public Counters getCounters() {
- return status.getCounters();
+ /**
+ * Returns the counters for this job
+ */
+ public Counters getCounters() throws IOException {
+ return jobSubmitClient.getJobCounters(getJobID());
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 2 11:56:22 2007
@@ -84,7 +84,8 @@
private LocalFileSystem localFs;
private String uniqueString;
- private Counters counters = new Counters();
+ private Counters mapCounters = new Counters();
+ private Counters reduceCounters = new Counters();
private MetricsRecord jobMetrics;
/**
@@ -132,6 +133,25 @@
}
/**
+ * Called periodically by JobTrackerMetrics to update the metrics for
+ * this job.
+ */
+ public void updateMetrics() {
+ Counters counters = getCounters();
+ for (String groupName : counters.getGroupNames()) {
+ Counters.Group group = counters.getGroup(groupName);
+ jobMetrics.setTag("group", group.getDisplayName());
+
+ for (String counter : group.getCounterNames()) {
+ long value = group.getCounter(counter);
+ jobMetrics.setTag("counter", group.getDisplayName(counter));
+ jobMetrics.setMetric("value", (float) value);
+ jobMetrics.update();
+ }
+ }
+ }
+
+ /**
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
@@ -354,29 +374,41 @@
(progressDelta / reduces.length)));
}
}
-
- //
- // Update counters by summing over all tasks in progress
- //
- Counters newCounters = new Counters();
- for (TaskInProgress mapTask : maps) {
- newCounters.incrAllCounters(mapTask.getCounters());
- }
- for (TaskInProgress reduceTask : reduces) {
- newCounters.incrAllCounters(reduceTask.getCounters());
- }
- this.status.setCounters(newCounters);
-
- //
- // Send counter data to the metrics package.
- //
- for (String counter : newCounters.getCounterNames()) {
- long value = newCounters.getCounter(counter);
- jobMetrics.setTag("counter", counter);
- jobMetrics.setMetric("value", (float) value);
- jobMetrics.update();
- }
- }
+ }
+
+ /**
+ * Returns map phase counters by summing over all map tasks in progress.
+ */
+ public synchronized Counters getMapCounters() {
+ return sumTaskCounters(maps);
+ }
+
+ /**
+ * Returns map phase counters by summing over all map tasks in progress.
+ */
+ public synchronized Counters getReduceCounters() {
+ return sumTaskCounters(reduces);
+ }
+
+ /**
+ * Returns the total job counters, by adding together the map and the
+ * reduce counters.
+ */
+ public Counters getCounters() {
+ return Counters.sum(getMapCounters(), getReduceCounters());
+ }
+
+ /**
+ * Returns a Counters instance representing the sum of all the counters in
+ * the array of tasks in progress.
+ */
+ private Counters sumTaskCounters(TaskInProgress[] tips) {
+ Counters counters = new Counters();
+ for (TaskInProgress tip : tips) {
+ counters.incrAllCounters(tip.getCounters());
+ }
+ return counters;
+ }
/////////////////////////////////////////////////////
// Create/manage tasks
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Fri Mar 2 11:56:22 2007
@@ -54,7 +54,7 @@
private int runState;
private long startTime;
private String user;
- private Counters counters = new Counters();
+
/**
*/
public JobStatus() {
@@ -118,7 +118,7 @@
public void setRunState(int state) {
this.runState = state;
}
-
+
/**
* Set the start time of the job
* @param startTime The startTime of the job
@@ -140,15 +140,6 @@
*/
public String getUsername() { return this.user;};
- /**
- * @param counters Counters for the job.
- */
- void setCounters(Counters counters) { this.counters = counters; }
- /**
- * @return the counters for the job
- */
- public Counters getCounters() { return counters; }
-
///////////////////////////////////////
// Writable
///////////////////////////////////////
@@ -159,8 +150,8 @@
out.writeInt(runState);
out.writeLong(startTime);
UTF8.writeString(out, user);
- counters.write(out);
}
+
public void readFields(DataInput in) throws IOException {
this.jobid = UTF8.readString(in);
this.mapProgress = in.readFloat();
@@ -168,6 +159,5 @@
this.runState = in.readInt();
this.startTime = in.readLong();
this.user = UTF8.readString(in);
- counters.readFields(in);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Mar 2 11:56:22 2007
@@ -32,7 +32,7 @@
*Changing the versionID to 2L since the getTaskCompletionEvents method has
*changed
*/
- public static final long versionID = 2L;
+ public static final long versionID = 3L;
/**
* Submit a Job for execution. Returns the latest profile for
* that job.
@@ -60,6 +60,11 @@
*/
public JobStatus getJobStatus(String jobid) throws IOException;
+ /**
+ * Grab the current job counters
+ */
+ public Counters getJobCounters(String jobid) throws IOException;
+
/**
* Grab a bunch of info on the map tasks that make up the job
*/
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Mar 2 11:56:22 2007
@@ -424,6 +424,12 @@
numJobsCompleted = 0;
}
metricsRecord.update();
+
+ if (tracker != null) {
+ for (JobInProgress jip : tracker.getRunningJobs()) {
+ jip.updateMetrics();
+ }
+ }
}
synchronized void launchMap() {
@@ -929,6 +935,15 @@
}
return v;
}
+ /**
+ * Version that is called from a timer thread, and therefore needs to be
+ * careful to synchronize.
+ */
+ public synchronized List<JobInProgress> getRunningJobs() {
+ synchronized (jobs) {
+ return (List<JobInProgress>) runningJobs();
+ }
+ }
public Vector failedJobs() {
Vector v = new Vector();
for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
@@ -1422,6 +1437,14 @@
return null;
}
}
+ public synchronized Counters getJobCounters(String jobid) {
+ JobInProgress job = (JobInProgress) jobs.get(jobid);
+ if (job != null) {
+ return job.getCounters();
+ } else {
+ return null;
+ }
+ }
public synchronized TaskReport[] getMapTaskReports(String jobid) {
JobInProgress job = (JobInProgress) jobs.get(jobid);
if (job == null) {
@@ -1545,7 +1568,7 @@
JobStatus status = jip.getStatus();
if (status.getRunState() == JobStatus.RUNNING
|| status.getRunState() == JobStatus.PREP) {
- status.setStartTime(jip.getStartTime());
+ status.setStartTime(jip.getStartTime());
status.setUsername(jip.getProfile().getUser());
v.add(status);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 2 11:56:22 2007
@@ -62,9 +62,12 @@
private Path localFile;
private FileSystem localFs;
- // Contains the counters summed over all the tasks which
+ // Counters summed over all the map/reduce tasks which
// have successfully completed
- private Counters counters = new Counters();
+ private Counters completedTaskCounters = new Counters();
+
+ // Current counters, including incomplete task(s)
+ private Counters currentCounters = new Counters();
public long getProtocolVersion(String protocol, long clientVersion) {
return TaskUmbilicalProtocol.versionID;
@@ -181,7 +184,7 @@
public Task getTask(String taskid) { return null; }
public void progress(String taskId, float progress, String state,
- TaskStatus.Phase phase, Counters taskStats) {
+ TaskStatus.Phase phase, Counters taskCounters) {
LOG.info(state);
float taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
@@ -190,9 +193,9 @@
} else {
status.setReduceProgress(progress);
}
+ currentCounters = Counters.sum(completedTaskCounters, taskCounters);
// ignore phase
- updateStatusCounters(taskStats);
}
/**
@@ -201,21 +204,7 @@
* successfully completed
*/
private void updateCounters(Task task) {
- counters.incrAllCounters(task.getCounters());
- status.setCounters(counters);
- }
-
- /**
- * Sets status counters to the sum of (1) the counters from
- * all completed tasks, and (2) the counters from a particular
- * task in progress.
- * @param taskCounters Counters from a task that is in progress
- */
- private void updateStatusCounters(Counters taskCounters) {
- Counters newStats = new Counters();
- newStats.incrAllCounters(counters);
- newStats.incrAllCounters(taskCounters);
- status.setCounters(newStats);
+ completedTaskCounters.incrAllCounters(task.getCounters());
}
public void reportDiagnosticInfo(String taskid, String trace) {
@@ -272,6 +261,11 @@
public JobStatus getJobStatus(String id) {
Job job = (Job)jobs.get(id);
return job.status;
+ }
+
+ public Counters getJobCounters(String id) {
+ Job job = (Job)jobs.get(id);
+ return job.currentCounters;
}
public String getFilesystemName() throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Mar 2 11:56:22 2007
@@ -50,6 +50,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.Task.Counter.*;
/** A Map task. */
class MapTask extends Task {
@@ -64,13 +65,6 @@
{ // set phase for this task
setPhase(TaskStatus.Phase.MAP);
}
-
- private enum Counter {
- INPUT_RECORDS,
- INPUT_BYTES,
- OUTPUT_RECORDS,
- OUTPUT_BYTES
- }
public MapTask() {}
@@ -161,8 +155,8 @@
setProgress(getProgress());
long beforePos = getPos();
boolean ret = rawIn.next(key, value);
- reporter.incrCounter(Counter.INPUT_RECORDS, 1);
- reporter.incrCounter(Counter.INPUT_BYTES, (getPos() - beforePos));
+ reporter.incrCounter(MAP_INPUT_RECORDS, 1);
+ reporter.incrCounter(MAP_INPUT_BYTES, (getPos() - beforePos));
return ret;
}
public long getPos() throws IOException { return rawIn.getPos(); }
@@ -324,8 +318,8 @@
int partNumber = partitioner.getPartition(key, value, partitions);
sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
- reporter.incrCounter(Counter.OUTPUT_RECORDS, 1);
- reporter.incrCounter(Counter.OUTPUT_BYTES,
+ reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
+ reporter.incrCounter(MAP_OUTPUT_BYTES,
(keyValBuffer.getLength() - keyOffset));
//now check whether we need to spill to disk
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 2 11:56:22 2007
@@ -44,6 +44,8 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.Task.Counter.*;
+
/** A Reduce task. */
class ReduceTask extends Task {
@@ -54,8 +56,6 @@
public Writable newInstance() { return new ReduceTask(); }
});
}
-
- private enum Counter { INPUT_RECORDS, OUTPUT_RECORDS }
private int numMaps;
private boolean sortComplete;
@@ -296,7 +296,7 @@
public void collect(WritableComparable key, Writable value)
throws IOException {
out.write(key, value);
- reporter.incrCounter(Counter.OUTPUT_RECORDS, 1);
+ reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
reportProgress(umbilical);
}
};
@@ -309,7 +309,7 @@
keyClass, valClass, umbilical, job);
values.informReduceProgress();
while (values.more()) {
- reporter.incrCounter(Counter.INPUT_RECORDS, 1);
+ reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
reducer.reduce(values.getKey(), values, collector, reporter);
values.nextKey();
values.informReduceProgress();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Fri Mar 2 11:56:22 2007
@@ -80,5 +80,8 @@
public TaskCompletionEvent[] getTaskCompletionEvents(
int startFrom) throws IOException;
- public Counters getCounters();
+ /**
+ * Gets the counters for this job.
+ */
+ public Counters getCounters() throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Mar 2 11:56:22 2007
@@ -35,6 +35,17 @@
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
+ // Counters used by Task subclasses
+ protected static enum Counter {
+ MAP_INPUT_RECORDS,
+ MAP_OUTPUT_RECORDS,
+ MAP_INPUT_BYTES,
+ MAP_OUTPUT_BYTES,
+ REDUCE_INPUT_RECORDS,
+ REDUCE_OUTPUT_RECORDS
+ }
+
+
////////////////////////////////////////////
// Fields
////////////////////////////////////////////
@@ -152,6 +163,7 @@
private transient long nextProgressTime =
System.currentTimeMillis() + PROGRESS_INTERVAL;
+ // Current counters
private transient Counters counters = new Counters();
public abstract boolean isMapTask();
@@ -174,8 +186,7 @@
public void incrCounter(Enum key, long amount) {
Counters counters = getCounters();
if (counters != null) {
- String name = key.getDeclaringClass().getName()+"#"+key.toString();
- counters.incrCounter(name, amount);
+ counters.incrCounter(key, amount);
}
}
};
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties?view=auto&rev=513917
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task_Counter.properties Fri Mar 2 11:56:22 2007
@@ -0,0 +1,11 @@
+# ResourceBundle properties file for Map-Reduce counters
+
+CounterGroupName= Map-Reduce Framework
+
+MAP_INPUT_RECORDS.name= Map input records
+MAP_INPUT_BYTES.name= Map input bytes
+MAP_OUTPUT_RECORDS.name= Map output records
+MAP_OUTPUT_BYTES.name= Map output bytes
+REDUCE_INPUT_RECORDS.name= Reduce input records
+REDUCE_OUTPUT_RECORDS.name= Reduce output records
+
Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Fri Mar 2 11:56:22 2007
@@ -3,6 +3,7 @@
import="javax.servlet.*"
import="javax.servlet.http.*"
import="java.io.*"
+ import="java.text.*"
import="java.util.*"
import="java.text.DecimalFormat"
import="org.apache.hadoop.mapred.*"
@@ -10,6 +11,7 @@
%>
<%!
+
JobTracker tracker = JobTracker.getTracker();
String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
@@ -37,21 +39,53 @@
failures += task.numTaskFailures();
}
out.print("<tr><th><a href=\"/jobtasks.jsp?jobid=" + jobId +
- "&type="+ kind + "&pagenum=1\">" + kind + "</a></th><td>" +
+ "&type="+ kind + "&pagenum=1\">" + kind +
+ "</a></th><td align=\"right\">" +
StringUtils.formatPercent(completePercent, 2) +
- "</td><td>" + totalTasks + "</td><td>" +
+ "</td><td align=\"right\">" +
+ totalTasks +
+ "</td><td align=\"right\">" +
(totalTasks - runningTasks - finishedTasks - killedTasks) +
- "</td><td>" +
- runningTasks + "</td><td>" +
- finishedTasks + "</td><td>" +
+ "</td><td align=\"right\">" +
+ runningTasks +
+ "</td><td align=\"right\">" +
+ finishedTasks +
+ "</td><td align=\"right\">" +
killedTasks +
- "</td><td><a href=\"/jobfailures.jsp?jobid=" + jobId +
+ "</td><td align=\"right\"><a href=\"/jobfailures.jsp?jobid=" + jobId +
"&kind=" + kind + "\">" +
failures + "</a></td></tr>\n");
}
-
- private void printJobStatus(JspWriter out,
- String jobId) throws IOException {
+%>
+<%
+ String jobId = request.getParameter("jobid");
+ String refreshParam = request.getParameter("refresh");
+
+ int refresh = 60; // refresh every 60 seconds by default
+ if (refreshParam != null) {
+ try {
+ refresh = Integer.parseInt(refreshParam);
+ }
+ catch (NumberFormatException ignored) {
+ }
+ }
+%>
+
+<html>
+<head>
+ <%
+ if (refresh != 0) {
+ %>
+ <meta http-equiv="refresh" content="<%=refresh%>">
+ <%
+ }
+ %>
+<title>Hadoop <%=jobId%> on <%=trackerName%></title>
+</head>
+<body>
+<h1>Hadoop <%=jobId%> on <a href="/jobtracker.jsp"><%=trackerName%></a></h1>
+
+<%
JobInProgress job = (JobInProgress) tracker.getJob(jobId);
if (job == null) {
out.print("<b>Job " + jobId + " not found.</b><br>\n");
@@ -99,33 +133,53 @@
job.getReduceTasks());
out.print("</table>\n");
- Counters counters = status.getCounters();
- out.println("<p/>");
- out.println("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");
- out.println("<tr><th>Counter</th><th>Value</th></tr>");
- for (String counter : counters.getCounterNames()) {
- out.print("<tr><td>" + counter + "</td><td>" + counters.getCounter(counter) +
- "</td></tr>\n");
+ %>
+ <p/>
+ <table border=2 cellpadding="5" cellspacing="2">
+ <tr>
+ <th><br/></th>
+ <th>Counter</th>
+ <th>Map</th>
+ <th>Reduce</th>
+ <th>Total</th>
+ </tr>
+ <%
+ Counters mapCounters = job.getMapCounters();
+ Counters reduceCounters = job.getReduceCounters();
+ Counters totalCounters = Counters.sum(mapCounters,reduceCounters);
+
+ for (String groupName : totalCounters.getGroupNames()) {
+ Counters.Group totalGroup = totalCounters.getGroup(groupName);
+ Counters.Group mapGroup = mapCounters.getGroup(groupName);
+ Counters.Group reduceGroup = reduceCounters.getGroup(groupName);
+
+ Format decimal = new DecimalFormat();
+
+ boolean isFirst = true;
+ for (String counter : totalGroup.getCounterNames()) {
+ String mapValue = decimal.format(mapGroup.getCounter(counter));
+ String reduceValue = decimal.format(reduceGroup.getCounter(counter));
+ String totalValue = decimal.format(totalGroup.getCounter(counter));
+ %>
+ <tr>
+ <%
+ if (isFirst) {
+ isFirst = false;
+ %>
+ <td rowspan="<%=totalGroup.size()%>"><%=totalGroup.getDisplayName()%></td>
+ <%
+ }
+ %>
+ <td><%=totalGroup.getDisplayName(counter)%></td>
+ <td align="right"><%=mapValue%></td>
+ <td align="right"><%=reduceValue%></td>
+ <td align="right"><%=totalValue%></td>
+ </tr>
+ <%
+ }
}
- out.print("</table>\n");
- }
-%>
-
-<%
- String jobid = request.getParameter("jobid");
-%>
-
-<html>
-<head>
-<meta http-equiv="refresh" content=60>
-<title>Hadoop <%=jobid%> on <%=trackerName%></title>
-</head>
-<body>
-<h1>Hadoop <%=jobid%> on <a href="/jobtracker.jsp"><%=trackerName%></a></h1>
-
-<%
- printJobStatus(out, jobid);
-%>
+ %>
+ </table>
<hr>
<a href="/jobtracker.jsp">Go back to JobTracker</a><br>
Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Fri Mar 2 11:56:22 2007
@@ -36,7 +36,9 @@
%>
<html>
-<title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerLabel%></title>
+ <head>
+ <title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerLabel%></title>
+ </head>
<body>
<h1>Hadoop <%=type%> task list for
<a href="/jobdetails.jsp?jobid=<%=jobid%>"><%=jobid%></a> on
@@ -67,10 +69,10 @@
report.getTaskId() + "</a></td>");
out.print("<td>" + StringUtils.formatPercent(report.getProgress(),2) +
"</td>");
- out.print("<td>" + report.getState() + "</td>");
- out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, report.getStartTime(),0) + "</td>");
+ out.print("<td>" + report.getState() + "<br/></td>");
+ out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, report.getStartTime(),0) + "<br/></td>");
out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat,
- report.getFinishTime(), report.getStartTime()) + "</td>");
+ report.getFinishTime(), report.getStartTime()) + "<br/></td>");
String[] diagnostics = report.getDiagnostics();
out.print("<td><pre>");
for (int j = 0; j < diagnostics.length ; j++) {
Modified: lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp Fri Mar 2 11:56:22 2007
@@ -14,7 +14,7 @@
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
- public void generateJobTable(JspWriter out, String label, Vector jobs) throws IOException {
+ public void generateJobTable(JspWriter out, String label, Vector jobs, int refresh) throws IOException {
out.print("<center>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
out.print("<tr><td align=\"center\" colspan=\"9\"><b>" + label + " Jobs </b></td></tr>\n");
@@ -39,7 +39,8 @@
int completedReduces = job.finishedReduces();
String name = profile.getJobName();
- out.print( "<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" +
+ out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid +
+ "&refresh=" + refresh + "\">" +
jobid + "</a></td>" +
"<td>" + profile.getUser() + "</td>"
+ "<td>" + ("".equals(name) ? " " : name) + "</td>" +
@@ -95,20 +96,20 @@
<h2>Running Jobs</h2>
<%
- generateJobTable(out, "Running", tracker.runningJobs());
+ generateJobTable(out, "Running", tracker.runningJobs(), 10);
%>
<hr>
<h2>Completed Jobs</h2>
<%
- generateJobTable(out, "Completed", tracker.completedJobs());
+ generateJobTable(out, "Completed", tracker.completedJobs(), 0);
%>
<hr>
<h2>Failed Jobs</h2>
<%
- generateJobTable(out, "Failed", tracker.failedJobs());
+ generateJobTable(out, "Failed", tracker.failedJobs(), 0);
%>
<hr>
Modified: lucene/hadoop/trunk/src/webapps/job/taskstats.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskstats.jsp?view=diff&rev=513917&r1=513916&r2=513917
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskstats.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Fri Mar 2 11:56:22 2007
@@ -4,6 +4,7 @@
import="javax.servlet.http.*"
import="java.io.*"
import="java.lang.String"
+ import="java.text.*"
import="java.util.*"
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.util.*"
@@ -16,44 +17,60 @@
JobInProgress job = (JobInProgress) tracker.getJob(jobid);
String tipid = request.getParameter("tipid");
String taskid = request.getParameter("taskid");
+ Format decimal = new DecimalFormat();
Counters counters;
if (taskid == null) {
- counters = tracker.getTipCounters(jobid, tipid);
- taskid = tipid; // for page title etc
+ counters = tracker.getTipCounters(jobid, tipid);
+ taskid = tipid; // for page title etc
}
else {
- TaskStatus taskStatus = tracker.getTaskStatus(jobid, tipid, taskid);
- counters = taskStatus.getCounters();
+ TaskStatus taskStatus = tracker.getTaskStatus(jobid, tipid, taskid);
+ counters = taskStatus.getCounters();
}
%>
<html>
-<title>Counters for <%=taskid%></title>
+ <head>
+ <title>Counters for <%=taskid%></title>
+ </head>
<body>
<h1>Counters for <%=taskid%></h1>
<hr>
<%
- if( counters == null ) {
+ if ( counters == null ) {
%>
- <h3>No counter information found for this task</h3>
+ <h3>No counter information found for this task</h3>
<%
- }else{
+ } else {
%>
- <table border=2 cellpadding="5" cellspacing="2">
- <tr><td align="center">Counter</td><td>Value</td></tr>
- <%
- for (String counter : counters.getCounterNames()) {
- long value = counters.getCounter(counter);
- %>
- <tr><td><%=counter%></td><td><%=value%></td></tr>
- <%
- }
- %>
- </table>
+ <table>
<%
- }
+ for (String groupName : counters.getGroupNames()) {
+ Counters.Group group = counters.getGroup(groupName);
+ String displayGroupName = group.getDisplayName();
+%>
+ <tr>
+ <td colspan="3"><br/><b><%=displayGroupName%></b></td>
+ </tr>
+<%
+ for (String counter : group.getCounterNames()) {
+ String displayCounterName = group.getDisplayName(counter);
+ long value = group.getCounter(counter);
+%>
+ <tr>
+ <td width="50"></td>
+ <td><%=displayCounterName%></td>
+ <td align="right"><%=decimal.format(value)%></td>
+ </tr>
+<%
+ }
+ }
+%>
+ </table>
+<%
+ }
%>
<hr>