You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2009/07/19 07:06:02 UTC
svn commit: r795478 - in /hadoop/mapreduce/trunk: ./ conf/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
Author: acmurthy
Date: Sun Jul 19 05:06:01 2009
New Revision: 795478
URL: http://svn.apache.org/viewvc?rev=795478&view=rev
Log:
MAPREDUCE-740. Log a job-summary at the end of a job, while allowing it to be configured to use a custom appender if desired.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/conf/log4j.properties
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Jul 19 05:06:01 2009
@@ -38,6 +38,9 @@
MAPREDUCE-467. Provide ability to collect statistics about total tasks
and succeeded tasks in different time windows. (sharad)
+ MAPREDUCE-740. Log a job-summary at the end of a job, while allowing it
+ to be configured to use a custom appender if desired. (acmurthy)
+
IMPROVEMENTS
HADOOP-5967. Sqoop should only use a single map task. (Aaron Kimball via
Modified: hadoop/mapreduce/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/log4j.properties?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/log4j.properties (original)
+++ hadoop/mapreduce/trunk/conf/log4j.properties Sun Jul 19 05:06:01 2009
@@ -3,6 +3,16 @@
hadoop.log.dir=.
hadoop.log.file=hadoop.log
+#
+# Job Summary Appender
+#
+# Use following logger to send summary to separate file defined by
+# hadoop.mapreduce.jobsummary.log.file rolled daily:
+# hadoop.mapreduce.jobsummary.logger=INFO,JSA
+#
+hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
+hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
+
# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter
@@ -92,3 +102,14 @@
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
#
log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+
+#
+# Job Summary Appender
+#
+log4j.appender.JSA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
+log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
+log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+log4j.appender.JSA.DatePattern=.yyyy-MM-dd
+log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
+log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sun Jul 19 05:06:01 2009
@@ -42,6 +42,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskType;
@@ -278,6 +279,9 @@
this.anyCacheLevel = this.maxLevel+1;
this.jobtracker = tracker;
this.restartCount = 0;
+ this.profile = new JobProfile(conf.getUser(), jobid, "", "",
+ conf.getJobName(),conf.getQueueName());
+
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
@@ -2392,8 +2396,25 @@
}
return true;
}
+
/**
+ * Metering: Occupied Slots * (Finish - Start)
+ * @param tip {@link TaskInProgress} to be metered which just completed,
+ * cannot be <code>null</code>
+ * @param status {@link TaskStatus} of the completed task, cannot be
+ * <code>null</code>
+ */
+ private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) {
+ JobCounter slotCounter =
+ (tip.isMapTask()) ? JobCounter.SLOTS_MILLIS_MAPS :
+ JobCounter.SLOTS_MILLIS_REDUCES;
+ jobCounters.incrCounter(slotCounter,
+ tip.getNumSlotsRequired() *
+ (status.getFinishTime() - status.getStartTime()));
+ }
+
+ /**
* A taskid assigned to this JobInProgress has reported in successfully.
*/
public synchronized boolean completedTask(TaskInProgress tip,
@@ -2402,6 +2423,9 @@
TaskAttemptID taskid = status.getTaskID();
final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
+ // Metering
+ meterTaskAttempt(tip, status);
+
// Sanity check: is the TIP already complete?
// This would not happen,
// because no two tasks are SUCCEEDED at the same time.
@@ -2583,6 +2607,12 @@
this.finishTime = JobTracker.getClock().getTime();
LOG.info("Job " + this.status.getJobID() +
" has completed successfully.");
+
+ // Log the job summary (this should be done prior to logging to
+ // job-history to ensure job-counters are in-sync
+ JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+
+ // Log job-history
JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime,
this.finishedMapTasks,
this.finishedReduceTasks, failedMapTasks,
@@ -2598,6 +2628,9 @@
private synchronized void terminateJob(int jobTerminationState) {
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
+ // Log the job summary
+ JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+
if (jobTerminationState == JobStatus.FAILED) {
this.status = new JobStatus(status.getJobID(),
1.0f, 1.0f, 1.0f, JobStatus.FAILED,
@@ -2616,6 +2649,7 @@
this.finishedReduceTasks);
}
garbageCollect();
+
jobtracker.getInstrumentation().terminateJob(
this.conf, this.status.getJobID());
}
@@ -2802,9 +2836,13 @@
failReduce(tip);
}
}
+
+ // Metering
+ meterTaskAttempt(tip, status);
}
- // the case when the map was complete but the task tracker went down.
+ // The case when the map was complete but the task tracker went down.
+ // However, we don't need to do any metering here...
if (wasComplete && !isComplete) {
if (tip.isMapTask()) {
// Put the task back in the cache. This will help locality for cases
@@ -2997,8 +3035,9 @@
* from the various tables.
*/
synchronized void garbageCollect() {
- //Cancel task tracker reservation
+ // Cancel task tracker reservation
cancelReservedSlots();
+
// Let the JobTracker know that a job is complete
jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
@@ -3191,4 +3230,64 @@
void setClusterSize(int clusterSize) {
this.clusterSize = clusterSize;
}
+
+ static class JobSummary {
+ static final Log LOG = LogFactory.getLog(JobSummary.class);
+
+ // Escape sequences
+ static final char EQUALS = '=';
+ static final char[] charsToEscape =
+ {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
+
+ /**
+ * Log a summary of the job's runtime.
+ *
+ * @param job {@link JobInProgress} whose summary is to be logged, cannot
+ * be <code>null</code>.
+ * @param cluster {@link ClusterStatus} of the cluster on which the job was
+ * run, cannot be <code>null</code>
+ */
+ public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
+ JobStatus status = job.getStatus();
+ JobProfile profile = job.getProfile();
+ String user = StringUtils.escapeString(profile.getUser(),
+ StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ String queue = StringUtils.escapeString(profile.getQueueName(),
+ StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ Counters jobCounters = job.getJobCounters();
+ long mapSlotSeconds =
+ (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) +
+ jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
+ long reduceSlotSeconds =
+ (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) +
+ jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
+
+ LOG.info("jobId=" + job.getJobID() + StringUtils.COMMA +
+ "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
+ "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
+ "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
+ "numMaps" + EQUALS + job.getMapTasks().length +
+ StringUtils.COMMA +
+ "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() +
+ StringUtils.COMMA +
+ "numReduces" + EQUALS + job.getReduceTasks().length +
+ StringUtils.COMMA +
+ "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() +
+ StringUtils.COMMA +
+ "user" + EQUALS + user + StringUtils.COMMA +
+ "queue" + EQUALS + queue + StringUtils.COMMA +
+ "status" + EQUALS +
+ JobStatus.getJobRunState(status.getRunState()) +
+ StringUtils.COMMA +
+ "mapSlotSeconds" + EQUALS + mapSlotSeconds + StringUtils.COMMA +
+ "reduceSlotsSeconds" + EQUALS + reduceSlotSeconds +
+ StringUtils.COMMA +
+ "clusterMapCapacity" + EQUALS + cluster.getMaxMapTasks() +
+ StringUtils.COMMA +
+ "clusterReduceCapacity" + EQUALS + cluster.getMaxReduceTasks()
+ );
+ }
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Sun Jul 19 05:06:01 2009
@@ -48,6 +48,22 @@
public static final int PREP = 4;
public static final int KILLED = 5;
+ private static final String UNKNOWN = "UNKNOWN";
+ private static final String[] runStates =
+ {UNKNOWN, "RUNNING", "SUCCEEDED", "FAILED", "PREP", "KILLED"};
+
+ /**
+ * Helper method to get human-readable state of the job.
+ * @param state job state
+ * @return human-readable state of the job
+ */
+ public static String getJobRunState(int state) {
+ if (state < 1 || state >= runStates.length) {
+ return UNKNOWN;
+ }
+ return runStates[state];
+ }
+
private JobID jobid;
private float mapProgress;
private float reduceProgress;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sun Jul 19 05:06:01 2009
@@ -1253,4 +1253,8 @@
TreeMap<TaskAttemptID, String> getActiveTasks() {
return activeTasks;
}
+
+ int getNumSlotsRequired() {
+ return numSlotsRequired;
+ }
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java Sun Jul 19 05:06:01 2009
@@ -27,6 +27,8 @@
OTHER_LOCAL_MAPS,
DATA_LOCAL_MAPS,
RACK_LOCAL_MAPS,
+ SLOTS_MILLIS_MAPS,
+ SLOTS_MILLIS_REDUCES,
FALLOW_SLOTS_MILLIS_MAPS,
FALLOW_SLOTS_MILLIS_REDUCES
}