You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2009/07/19 07:06:02 UTC

svn commit: r795478 - in /hadoop/mapreduce/trunk: ./ conf/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/

Author: acmurthy
Date: Sun Jul 19 05:06:01 2009
New Revision: 795478

URL: http://svn.apache.org/viewvc?rev=795478&view=rev
Log:
MAPREDUCE-740. Log a job-summary at the end of a job, while allowing it to be configured to use a custom appender if desired.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/conf/log4j.properties
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Jul 19 05:06:01 2009
@@ -38,6 +38,9 @@
     MAPREDUCE-467. Provide ability to collect statistics about total tasks 
     and succeeded tasks in different time windows. (sharad)
 
+    MAPREDUCE-740. Log a job-summary at the end of a job, while allowing it
+    to be configured to use a custom appender if desired. (acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-5967. Sqoop should only use a single map task. (Aaron Kimball via

Modified: hadoop/mapreduce/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/log4j.properties?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/log4j.properties (original)
+++ hadoop/mapreduce/trunk/conf/log4j.properties Sun Jul 19 05:06:01 2009
@@ -3,6 +3,16 @@
 hadoop.log.dir=.
 hadoop.log.file=hadoop.log
 
+#
+# Job Summary Appender 
+#
+# Use following logger to send summary to separate file defined by 
+# hadoop.mapreduce.jobsummary.log.file rolled daily:
+# hadoop.mapreduce.jobsummary.logger=INFO,JSA
+# 
+hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
+hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
+
 # Define the root logger to the system property "hadoop.root.logger".
 log4j.rootLogger=${hadoop.root.logger}, EventCounter
 
@@ -92,3 +102,14 @@
 # Sends counts of logging messages at different severity levels to Hadoop Metrics.
 #
 log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+
+#
+# Job Summary Appender
+#
+log4j.appender.JSA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
+log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
+log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+log4j.appender.JSA.DatePattern=.yyyy-MM-dd
+log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
+log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sun Jul 19 05:06:01 2009
@@ -42,6 +42,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -278,6 +279,9 @@
     this.anyCacheLevel = this.maxLevel+1;
     this.jobtracker = tracker;
     this.restartCount = 0;
+    this.profile = new JobProfile(conf.getUser(), jobid, "", "", 
+                                  conf.getJobName(),conf.getQueueName());
+
     
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
@@ -2392,8 +2396,25 @@
     }
     return true;
   }
+  
 
   /**
+   * Metering: Occupied Slots * (Finish - Start)
+   * @param tip {@link TaskInProgress} to be metered which just completed, 
+   *            cannot be <code>null</code> 
+   * @param status {@link TaskStatus} of the completed task, cannot be 
+   *               <code>null</code>
+   */
+  private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) {
+    JobCounter slotCounter = 
+      (tip.isMapTask()) ? JobCounter.SLOTS_MILLIS_MAPS : 
+                          JobCounter.SLOTS_MILLIS_REDUCES;
+    jobCounters.incrCounter(slotCounter, 
+                            tip.getNumSlotsRequired() * 
+                            (status.getFinishTime() - status.getStartTime()));
+  }
+  
+  /**
    * A taskid assigned to this JobInProgress has reported in successfully.
    */
   public synchronized boolean completedTask(TaskInProgress tip, 
@@ -2402,6 +2423,9 @@
     TaskAttemptID taskid = status.getTaskID();
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
         
+    // Metering
+    meterTaskAttempt(tip, status);
+    
     // Sanity check: is the TIP already complete? 
     // This would not happen, 
     // because no two tasks are SUCCEEDED at the same time. 
@@ -2583,6 +2607,12 @@
       this.finishTime = JobTracker.getClock().getTime();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
+      
+      // Log the job summary (this should be done prior to logging to 
+      // job-history to ensure job-counters are in-sync 
+      JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+
+      // Log job-history
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks, failedMapTasks, 
@@ -2598,6 +2628,9 @@
   private synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
+      // Log the job summary
+      JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+      
       if (jobTerminationState == JobStatus.FAILED) {
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.FAILED,
@@ -2616,6 +2649,7 @@
                                      this.finishedReduceTasks);
       }
       garbageCollect();
+      
       jobtracker.getInstrumentation().terminateJob(
           this.conf, this.status.getJobID());
     }
@@ -2802,9 +2836,13 @@
           failReduce(tip);
         }
       }
+      
+      // Metering
+      meterTaskAttempt(tip, status);
     }
         
-    // the case when the map was complete but the task tracker went down.
+    // The case when the map was complete but the task tracker went down.
+    // However, we don't need to do any metering here...
     if (wasComplete && !isComplete) {
       if (tip.isMapTask()) {
         // Put the task back in the cache. This will help locality for cases
@@ -2997,8 +3035,9 @@
    * from the various tables.
    */
   synchronized void garbageCollect() {
-    //Cancel task tracker reservation
+    // Cancel task tracker reservation
     cancelReservedSlots();
+    
     // Let the JobTracker know that a job is complete
     jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
     jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
@@ -3191,4 +3230,64 @@
   void setClusterSize(int clusterSize) {
     this.clusterSize = clusterSize;
   }
+
+  static class JobSummary {
+    static final Log LOG = LogFactory.getLog(JobSummary.class);
+    
+    // Escape sequences 
+    static final char EQUALS = '=';
+    static final char[] charsToEscape = 
+      {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
+    
+    /**
+     * Log a summary of the job's runtime.
+     * 
+     * @param job {@link JobInProgress} whose summary is to be logged, cannot
+     *            be <code>null</code>.
+     * @param cluster {@link ClusterStatus} of the cluster on which the job was
+     *                run, cannot be <code>null</code>
+     */
+    public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
+      JobStatus status = job.getStatus();
+      JobProfile profile = job.getProfile();
+      String user = StringUtils.escapeString(profile.getUser(), 
+                                             StringUtils.ESCAPE_CHAR, 
+                                             charsToEscape);
+      String queue = StringUtils.escapeString(profile.getQueueName(), 
+                                              StringUtils.ESCAPE_CHAR, 
+                                              charsToEscape);
+      Counters jobCounters = job.getJobCounters();
+      long mapSlotSeconds = 
+        (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) +
+         jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
+      long reduceSlotSeconds = 
+        (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) +
+         jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
+
+      LOG.info("jobId=" + job.getJobID() + StringUtils.COMMA +
+               "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
+               "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
+               "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
+               "numMaps" + EQUALS + job.getMapTasks().length + 
+                           StringUtils.COMMA +
+               "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() + 
+                                  StringUtils.COMMA +
+               "numReduces" + EQUALS + job.getReduceTasks().length + 
+                              StringUtils.COMMA +
+               "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() + 
+                                     StringUtils.COMMA +
+               "user" + EQUALS + user + StringUtils.COMMA +
+               "queue" + EQUALS + queue + StringUtils.COMMA +
+               "status" + EQUALS + 
+                          JobStatus.getJobRunState(status.getRunState()) + 
+                          StringUtils.COMMA + 
+               "mapSlotSeconds" + EQUALS + mapSlotSeconds + StringUtils.COMMA +
+               "reduceSlotsSeconds" + EQUALS + reduceSlotSeconds  + 
+                                      StringUtils.COMMA +
+               "clusterMapCapacity" + EQUALS + cluster.getMaxMapTasks() + 
+                                      StringUtils.COMMA +
+               "clusterReduceCapacity" + EQUALS + cluster.getMaxReduceTasks()
+      );
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Sun Jul 19 05:06:01 2009
@@ -48,6 +48,22 @@
   public static final int PREP = 4;
   public static final int KILLED = 5;
 
+  private static final String UNKNOWN = "UNKNOWN";
+  private static final String[] runStates =
+      {UNKNOWN, "RUNNING", "SUCCEEDED", "FAILED", "PREP", "KILLED"};
+  
+  /**
+   * Helper method to get human-readable state of the job.
+   * @param state job state
+   * @return human-readable state of the job
+   */
+  public static String getJobRunState(int state) {
+    if (state < 1 || state >= runStates.length) {
+      return UNKNOWN;
+    }
+    return runStates[state];
+  }
+  
   private JobID jobid;
   private float mapProgress;
   private float reduceProgress;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sun Jul 19 05:06:01 2009
@@ -1253,4 +1253,8 @@
   TreeMap<TaskAttemptID, String> getActiveTasks() {
     return activeTasks;
   }
+
+  int getNumSlotsRequired() {
+    return numSlotsRequired;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=795478&r1=795477&r2=795478&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobCounter.java Sun Jul 19 05:06:01 2009
@@ -27,6 +27,8 @@
   OTHER_LOCAL_MAPS,
   DATA_LOCAL_MAPS,
   RACK_LOCAL_MAPS,
+  SLOTS_MILLIS_MAPS,
+  SLOTS_MILLIS_REDUCES,
   FALLOW_SLOTS_MILLIS_MAPS,
   FALLOW_SLOTS_MILLIS_REDUCES
 }