You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/08/02 00:57:57 UTC

svn commit: r681888 - in /hadoop/core/trunk: ./ conf/ src/mapred/org/apache/hadoop/mapred/

Author: acmurthy
Date: Fri Aug  1 15:57:57 2008
New Revision: 681888

URL: http://svn.apache.org/viewvc?rev=681888&view=rev
Log:
HADOOP-3772. Add a new Hadoop Instrumentation api for the JobTracker and the TaskTracker, refactor Hadoop Metrics as an implementation of the api. Contributed by Ari Rabkin.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug  1 15:57:57 2008
@@ -60,6 +60,10 @@
     HADOOP-3730. Adds a new JobConf constructor that disables loading
     default configurations. (Alejandro Abdelnur via ddas)
 
+    HADOOP-3772. Add a new Hadoop Instrumentation api for the JobTracker and
+    the TaskTracker, refactor Hadoop Metrics as an implementation of the api.
+    (Ari Rabkin via acmurthy) 
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Aug  1 15:57:57 2008
@@ -676,6 +676,13 @@
 </property>
 
 <property>
+  <name>mapred.tasktracker.instrumentation</name>
+  <value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value>
+  <description>Expert: The instrumentation class to associate with each TaskTracker.
+  </description>
+</property>
+
+<property>
   <name>mapred.map.tasks</name>
   <value>2</value>
   <description>The default number of map tasks per job.  Typically set
@@ -774,6 +781,13 @@
 </property>
 
 <property>
+  <name>mapred.job.instrumentation</name>
+  <value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>
+  <description>Expert: The instrumentation class to associate with each TaskTracker.
+  </description>
+</property>
+
+<property>
   <name>mapred.child.java.opts</name>
   <value>-Xmx200m</value>
   <description>Java opts for the task tracker child processes.  

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Aug  1 15:57:57 2008
@@ -37,7 +37,6 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -475,7 +474,7 @@
   ////////////////////////////////////////////////////
   public synchronized void updateTaskStatus(TaskInProgress tip, 
                                             TaskStatus status,
-                                            JobTrackerMetrics metrics) {
+                                            JobTrackerInstrumentation metrics) {
 
     double oldProgress = tip.getProgress();   // save old progress
     boolean wasRunning = tip.isRunning();
@@ -1306,7 +1305,7 @@
    */
   public synchronized boolean completedTask(TaskInProgress tip, 
                                          TaskStatus status,
-                                         JobTrackerMetrics metrics) 
+                                         JobTrackerInstrumentation metrics) 
   {
     TaskAttemptID taskid = status.getTaskID();
         
@@ -1359,13 +1358,13 @@
     if (tip.isMapTask()){
       runningMapTasks -= 1;
       finishedMapTasks += 1;
-      metrics.completeMap();
+      metrics.completeMap(taskid);
       // remove the completed map from the resp running caches
       retireMap(tip);
     } else{
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
-      metrics.completeReduce();
+      metrics.completeReduce(taskid);
       // remove the completed reduces from the running reducers set
       retireReduce(tip);
     }
@@ -1393,7 +1392,7 @@
    * @param metrics job-tracker metrics
    * @return
    */
-  private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
+  private boolean isJobComplete(TaskInProgress tip, JobTrackerInstrumentation metrics) {
     // Job is complete if total-tips = finished-tips + failed-tips
     boolean allDone = 
       ((finishedMapTasks + failedMapTIPs) == numMapTasks);
@@ -1467,7 +1466,7 @@
                           TaskStatus status, 
                           TaskTrackerStatus taskTrackerStatus,
                           boolean wasRunning, boolean wasComplete,
-                          JobTrackerMetrics metrics) {
+                          JobTrackerInstrumentation metrics) {
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
 
@@ -1611,7 +1610,7 @@
    */
   public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
                          TaskStatus.Phase phase, TaskStatus.State state, 
-                         String trackerName, JobTrackerMetrics metrics) {
+                         String trackerName, JobTrackerInstrumentation metrics) {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
                                                     0.0f,
@@ -1733,7 +1732,7 @@
   synchronized void fetchFailureNotification(TaskInProgress tip, 
                                              TaskAttemptID mapTaskId, 
                                              String trackerName, 
-                                             JobTrackerMetrics metrics) {
+                                             JobTrackerInstrumentation metrics) {
     Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
     fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
     mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
@@ -1763,9 +1762,9 @@
     private JobInProgress job;
     private TaskInProgress tip;
     private TaskAttemptID taskId;
-    private JobTrackerMetrics metrics;
+    private JobTrackerInstrumentation metrics;
     JobWithTaskContext(JobInProgress job, TaskInProgress tip, 
-        TaskAttemptID taskId, JobTrackerMetrics metrics) {
+        TaskAttemptID taskId, JobTrackerInstrumentation metrics) {
       this.job = job;
       this.tip = tip;
       this.taskId = taskId;
@@ -1780,7 +1779,7 @@
     TaskAttemptID getTaskID() {
       return taskId;
     }
-    JobTrackerMetrics getJobTrackerMetrics() {
+    JobTrackerInstrumentation getJobTrackerMetrics() {
       return metrics;
     }
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Aug  1 15:57:57 2008
@@ -218,7 +218,7 @@
                                      tip.isMapTask()? TaskStatus.Phase.MAP:
                                      TaskStatus.Phase.STARTING,
                                      TaskStatus.State.FAILED,
-                                     trackerName, myMetrics);
+                                     trackerName, myInstrumentation);
                   }
                   itr.remove();
                 } else {
@@ -393,83 +393,9 @@
     }
   }
 
-  static class JobTrackerMetrics implements Updater {
-    private MetricsRecord metricsRecord = null;
-    private int numMapTasksLaunched = 0;
-    private int numMapTasksCompleted = 0;
-    private int numReduceTasksLaunched = 0;
-    private int numReduceTasksCompleted = 0;
-    private int numJobsSubmitted = 0;
-    private int numJobsCompleted = 0;
-    private JobTracker tracker;
-      
-    JobTrackerMetrics(JobTracker tracker, JobConf conf) {
-      String sessionId = conf.getSessionId();
-      // Initiate JVM Metrics
-      JvmMetrics.init("JobTracker", sessionId);
-      // Create a record for map-reduce metrics
-      MetricsContext context = MetricsUtil.getContext("mapred");
-      metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
-      metricsRecord.setTag("sessionId", sessionId);
-      this.tracker = tracker;
-      context.registerUpdater(this);
-    }
-      
-    /**
-     * Since this object is a registered updater, this method will be called
-     * periodically, e.g. every 5 seconds.
-     */
-    public void doUpdates(MetricsContext unused) {
-      synchronized (this) {
-        metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
-        metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
-        metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
-        metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
-        metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
-        metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
-              
-        numMapTasksLaunched = 0;
-        numMapTasksCompleted = 0;
-        numReduceTasksLaunched = 0;
-        numReduceTasksCompleted = 0;
-        numJobsSubmitted = 0;
-        numJobsCompleted = 0;
-      }
-      metricsRecord.update();
-
-      if (tracker != null) {
-        for (JobInProgress jip : tracker.getRunningJobs()) {
-          jip.updateMetrics();
-        }
-      }
-    }
+ 
 
-    synchronized void launchMap() {
-      ++numMapTasksLaunched;
-    }
-      
-    synchronized void completeMap() {
-      ++numMapTasksCompleted;
-    }
-      
-    synchronized void launchReduce() {
-      ++numReduceTasksLaunched;
-    }
-      
-    synchronized void completeReduce() {
-      ++numReduceTasksCompleted;
-    }
-      
-    synchronized void submitJob() {
-      ++numJobsSubmitted;
-    }
-      
-    synchronized void completeJob() {
-      ++numJobsCompleted;
-    }
-  }
-
-  private JobTrackerMetrics myMetrics = null;
+  private JobTrackerInstrumentation myInstrumentation = null;
     
   /////////////////////////////////////////////////////////////////
   // The real JobTracker
@@ -661,7 +587,18 @@
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
     trackerIdentifier = dateFormat.format(new Date());
 
-    myMetrics = new JobTrackerMetrics(this, jobConf);
+    Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
+    try {
+      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+      this.myInstrumentation = c.newInstance(this, jobConf);
+    } catch(Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by 
+      //falling back on the default.
+      LOG.error("failed to initialize job tracker metrics", e);
+      this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+    }
+ 
     
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
@@ -728,6 +665,16 @@
     LOG.info("Starting RUNNING");
   }
 
+  public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
+    return conf.getClass("mapred.jobtracker.instrumentation",
+        JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
+  }
+  
+  public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
+    conf.setClass("mapred.jobtracker.instrumentation",
+        t, JobTrackerInstrumentation.class);
+  }
+
   public static InetSocketAddress getAddress(Configuration conf) {
     String jobTrackerStr =
       conf.get("mapred.job.tracker", "localhost:8012");
@@ -1290,9 +1237,9 @@
           for (Task task : tasks) {
             expireLaunchingTasks.addNewTask(task.getTaskID());
             if (task.isMapTask()) {
-              myMetrics.launchMap();
+              myInstrumentation.launchMap(task.getTaskID());
             } else {
-              myMetrics.launchReduce();
+              myInstrumentation.launchReduce(task.getTaskID());
             }
             LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
             actions.add(new LaunchTaskAction(task));
@@ -1620,7 +1567,7 @@
         }
       }
     }
-    myMetrics.submitJob();
+    myInstrumentation.submitJob();
     return job.getStatus();
   }
 
@@ -1880,7 +1827,7 @@
         LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskID());
       } else {
         expireLaunchingTasks.removeTask(taskId);
-        tip.getJob().updateTaskStatus(tip, report, myMetrics);
+        tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
       }
       
       // Process 'failed fetch' notifications 
@@ -1898,7 +1845,7 @@
             failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, 
                                                              mapTaskId, 
                                                              failedFetchTrackerName, 
-                                                             myMetrics);
+                                                             myInstrumentation);
           }
         }
       }
@@ -1934,7 +1881,7 @@
                            (tip.isMapTask() ? 
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
-                           TaskStatus.State.KILLED, trackerName, myMetrics);
+                           TaskStatus.State.KILLED, trackerName, myInstrumentation);
             jobsWithFailures.add(job);
           }
         } else {
@@ -2006,7 +1953,7 @@
           JobInProgress[] jobs = new JobInProgress[jobList.size()];
           TaskInProgress[] tips = new TaskInProgress[jobList.size()];
           TaskAttemptID[] taskids = new TaskAttemptID[jobList.size()];
-          JobTrackerMetrics[] metrics = new JobTrackerMetrics[jobList.size()];
+          JobTrackerInstrumentation[] metrics = new JobTrackerInstrumentation[jobList.size()];
 
           Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
           int count = 0;
@@ -2169,4 +2116,6 @@
     }
   }
 
+
+
 }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Aug  1 15:57:57 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.Updater;
+
+public class JobTrackerInstrumentation {
+
+  protected final JobTracker tracker;
+  
+  public JobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+    tracker = jt;
+  }
+
+  public void launchMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void completeMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void launchReduce(TaskAttemptID taskAttemptID)
+  { }
+
+  public void completeReduce(TaskAttemptID taskAttemptID)
+  {  }
+  
+  public void submitJob() 
+  { }
+    
+  public void completeJob() 
+  { }
+
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Fri Aug  1 15:57:57 2008
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
+  private MetricsRecord metricsRecord = null;
+  int numMapTasksLaunched = 0;
+  int numMapTasksCompleted = 0;
+  int numReduceTasksLaunched = 0;
+  int numReduceTasksCompleted = 0;
+  private int numJobsSubmitted = 0;
+  private int numJobsCompleted = 0;
+    
+  public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
+    super(tracker, conf);
+    String sessionId = conf.getSessionId();
+    // Initiate JVM Metrics
+    JvmMetrics.init("JobTracker", sessionId);
+    // Create a record for map-reduce metrics
+    MetricsContext context = MetricsUtil.getContext("mapred");
+    metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+    metricsRecord.setTag("sessionId", sessionId);
+    context.registerUpdater(this);
+  }
+    
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+      metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+      metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+      metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+      metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
+      metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
+            
+      numMapTasksLaunched = 0;
+      numMapTasksCompleted = 0;
+      numReduceTasksLaunched = 0;
+      numReduceTasksCompleted = 0;
+      numJobsSubmitted = 0;
+      numJobsCompleted = 0;
+    }
+    metricsRecord.update();
+
+    if (tracker != null) {
+      for (JobInProgress jip : tracker.getRunningJobs()) {
+        jip.updateMetrics();
+      }
+    }
+  }
+
+  public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksLaunched;
+  }
+    
+  public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksCompleted;
+  }
+    
+  public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksLaunched;
+  }
+    
+  public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksCompleted;
+  }
+    
+  public synchronized void submitJob() {
+    ++numJobsSubmitted;
+  }
+    
+  public synchronized void completeJob() {
+    ++numJobsCompleted;
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Aug  1 15:57:57 2008
@@ -29,7 +29,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
+import org.apache.hadoop.mapred.JobTrackerMetricsInst;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 class LocalJobRunner implements JobSubmissionProtocol {
@@ -42,7 +42,7 @@
   private int map_tasks = 0;
   private int reduce_tasks = 0;
 
-  private JobTrackerMetrics myMetrics = null;
+  private JobTrackerInstrumentation myMetrics = null;
 
   private static final String jobDir =  "localRunner/";
   
@@ -153,10 +153,10 @@
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
           map_tasks += 1;
-          myMetrics.launchMap();
+          myMetrics.launchMap(mapId);
           map.run(localConf, this);
           map.saveTaskOutput();
-          myMetrics.completeMap();
+          myMetrics.completeMap(mapId);
           map_tasks -= 1;
           updateCounters(map);
         }
@@ -197,10 +197,10 @@
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
-              myMetrics.launchReduce();
+              myMetrics.launchReduce(reduce.getTaskID());
               reduce.run(localConf, this);
               reduce.saveTaskOutput();
-              myMetrics.completeReduce();
+              myMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
               updateCounters(reduce);
             }
@@ -310,7 +310,7 @@
   public LocalJobRunner(JobConf conf) throws IOException {
     this.fs = FileSystem.get(conf);
     this.conf = conf;
-    myMetrics = new JobTrackerMetrics(null, new JobConf(conf));
+    myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
   }
 
   // JobSubmissionProtocol methods

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Aug  1 15:57:57 2008
@@ -387,6 +387,7 @@
       File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
       File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
       stdout.getParentFile().mkdirs();
+      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
       List<String> wrappedCommand = 
         TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
       Map<String, String> env = new HashMap<String, String>();
@@ -454,10 +455,10 @@
       // error and output are appropriately redirected
     } finally { // handle the exit code
       int exit_code = shexec.getExitCode();
-     
+      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
       if (!killed && exit_code != 0) {
         if (exit_code == 65) {
-          tracker.getTaskTrackerMetrics().taskFailedPing();
+          tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
         }
         throw new IOException("Task process exit with nonzero status of " +
                               exit_code + ".");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Aug  1 15:57:57 2008
@@ -24,6 +24,7 @@
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -49,6 +50,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -230,64 +232,15 @@
       shuffleMetricsRecord.update();
     }
   }
-  public class TaskTrackerMetrics implements Updater {
-    private MetricsRecord metricsRecord = null;
-    private int numCompletedTasks = 0;
-    private int timedoutTasks = 0;
-    private int tasksFailedPing = 0;
-      
-    TaskTrackerMetrics() {
-      JobConf conf = getJobConf();
-      String sessionId = conf.getSessionId();
-      // Initiate Java VM Metrics
-      JvmMetrics.init("TaskTracker", sessionId);
-      // Create a record for Task Tracker metrics
-      MetricsContext context = MetricsUtil.getContext("mapred");
-      metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
-      metricsRecord.setTag("sessionId", sessionId);
-      context.registerUpdater(this);
-    }
-      
-    synchronized void completeTask() {
-      ++numCompletedTasks;
-    }
-    
-    synchronized void timedoutTask() {
-      ++timedoutTasks;
-    }
-    
-    synchronized void taskFailedPing() {
-      ++tasksFailedPing;
-    }
-    
-    /**
-     * Since this object is a registered updater, this method will be called
-     * periodically, e.g. every 5 seconds.
-     */  
-    public void doUpdates(MetricsContext unused) {
-      synchronized (this) {
-        if (metricsRecord != null) {
-          metricsRecord.setMetric("maps_running", mapTotal);
-          metricsRecord.setMetric("reduces_running", reduceTotal);
-          metricsRecord.setMetric("mapTaskSlots", (short)maxCurrentMapTasks);
-          metricsRecord.setMetric("reduceTaskSlots", 
-                                       (short)maxCurrentReduceTasks);
-          metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
-          metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
-          metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
-        }
-        numCompletedTasks = 0;
-        timedoutTasks = 0;
-        tasksFailedPing = 0;
-      }
-      metricsRecord.update();
-    }
-  }
+  
+
+  
+  
     
-  private TaskTrackerMetrics myMetrics = null;
+  private TaskTrackerInstrumentation myInstrumentation = null;
 
-  public TaskTrackerMetrics getTaskTrackerMetrics() {
-    return myMetrics;
+  public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
+    return myInstrumentation;
   }
   
   /**
@@ -416,8 +369,17 @@
     //tweak the probe sample size (make it a function of numCopiers)
     probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
     
-    
-    this.myMetrics = new TaskTrackerMetrics();
+    Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
+    try {
+      java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
+        metricsInst.getConstructor(new Class[] {TaskTracker.class} );
+      this.myInstrumentation = c.newInstance(this);
+    } catch(Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by 
+      //falling back on the default.
+      LOG.error("failed to initialize taskTracker metrics", e);
+      this.myInstrumentation = new TaskTrackerMetricsInst(this);
+    }
     
     // bind address
     String address = 
@@ -464,6 +426,16 @@
     mapEventsFetcher.start();
   }
   
+  public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
+    return conf.getClass("mapred.tasktracker.instrumentation",
+        TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
+  }
+  
+  public static void setInstrumentationClass(Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
+    conf.setClass("mapred.tasktracker.instrumentation",
+        t, TaskTrackerInstrumentation.class);
+  }
+  
   /** 
    * Removes all contents of temporary storage.  Called upon 
    * startup, to remove any leftovers from previous run.
@@ -1072,7 +1044,7 @@
             reduceTotal--;
           }
           try {
-            myMetrics.completeTask();
+            myInstrumentation.completeTask(taskStatus.getTaskID());
           } catch (MetricsException me) {
             LOG.warn("Caught: " + StringUtils.stringifyException(me));
           }
@@ -1137,7 +1109,7 @@
           LOG.info(tip.getTask().getTaskID() + ": " + msg);
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
-          myMetrics.timedoutTask();
+          myInstrumentation.timedoutTask(tip.getTask().getTaskID());
           purgeTask(tip, true);
         }
       }
@@ -2530,4 +2502,13 @@
       }
     }
   }
+
+  int getMaxCurrentMapTasks() {
+    return maxCurrentMapTasks;
+  }
+  
+  int getMaxCurrentReduceTasks() {
+    return maxCurrentReduceTasks;
+  }
+
 }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java Fri Aug  1 15:57:57 2008
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+
+/**
+ * TaskTrackerInstrumentation defines a number of instrumentation points
+ * associated with TaskTrackers.  By default, the instrumentation points do
+ * nothing, but subclasses can do arbitrary instrumentation and monitoring at
+ * these points.
+ * 
+ * TaskTrackerInstrumentation interfaces are associated uniquely with a
+ * TaskTracker.  We don't want an inner class here, because then subclasses
+ * wouldn't have direct access to the associated TaskTracker.
+ *  
+ **/
+public class TaskTrackerInstrumentation  {
+
+  protected final TaskTracker tt;
+  
+  public TaskTrackerInstrumentation(TaskTracker t) {
+    tt = t;
+  }
+  
+  /**
+   * invoked when task attempt t succeeds
+   * @param t
+   */
+  public void completeTask(TaskAttemptID t) { }
+  
+  public void timedoutTask(TaskAttemptID t) { }
+  
+  public void taskFailedPing(TaskAttemptID t) { }
+
+  /**
+   * Called just before task attempt t starts.
+   * @param stdout the file containing standard out of the new task
+   * @param stderr the file containing standard error of the new task 
+   */
+  public void reportTaskLaunch(TaskAttemptID t, File stdout, File stderr)  { }
+  
+  /**
+   * called when task t has just finished.
+   * @param t
+   */
+  public void reportTaskEnd(TaskAttemptID t) {}
+   
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java Fri Aug  1 15:57:57 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+  
+public class TaskTrackerMetricsInst extends TaskTrackerInstrumentation implements Updater {
+  private final MetricsRecord metricsRecord;
+  private int numCompletedTasks = 0;
+  private int timedoutTasks = 0;
+  private int tasksFailedPing = 0;
+    
+  public TaskTrackerMetricsInst(TaskTracker t) {
+    super(t);
+    JobConf conf = tt.getJobConf();
+    String sessionId = conf.getSessionId();
+    // Initiate Java VM Metrics
+    JvmMetrics.init("TaskTracker", sessionId);
+    // Create a record for Task Tracker metrics
+    MetricsContext context = MetricsUtil.getContext("mapred");
+    metricsRecord = MetricsUtil.createRecord(context, "tasktracker"); //guaranteed never null
+    metricsRecord.setTag("sessionId", sessionId);
+    context.registerUpdater(this);
+  }
+    
+  synchronized void completeTask() {
+    ++numCompletedTasks;
+  }
+  
+  synchronized void timedoutTask() {
+    ++timedoutTasks;
+  }
+  
+  synchronized void taskFailedPing() {
+    ++tasksFailedPing;
+  }
+  
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */  
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      metricsRecord.setMetric("maps_running", tt.mapTotal);
+      metricsRecord.setMetric("reduces_running", tt.reduceTotal);
+      metricsRecord.setMetric("mapTaskSlots", (short)tt.getMaxCurrentMapTasks());
+      metricsRecord.setMetric("reduceTaskSlots", 
+                                   (short)tt.getMaxCurrentReduceTasks());
+      metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
+      metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
+      metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
+      
+      numCompletedTasks = 0;
+      timedoutTasks = 0;
+      tasksFailedPing = 0;
+    }
+      metricsRecord.update();
+  }
+
+  
+}
\ No newline at end of file