You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2010/08/20 19:45:45 UTC

svn commit: r987589 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskTracker.java src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java

Author: acmurthy
Date: Fri Aug 20 17:45:44 2010
New Revision: 987589

URL: http://svn.apache.org/viewvc?rev=987589&view=rev
Log:
MAPREDUCE-1881. Improve TaskTrackerInstrumentation to enable collection of advanced metrics. Contributed by Matei Zaharia.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=987589&r1=987588&r2=987589&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug 20 17:45:44 2010
@@ -121,6 +121,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1920. Enables completed jobstatus store by default. (Tom White
     via amareshwari)
 
+    MAPREDUCE-1881. Improve TaskTrackerInstrumentation to enable collection of
+    advanced metrics. (Matei Zaharia via acmurthy)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=987589&r1=987588&r2=987589&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Aug 20 17:45:44 2010
@@ -625,17 +625,8 @@ public class TaskTracker 
     probe_sample_size = 
       this.fConf.getInt(TT_MAX_TASK_COMPLETION_EVENTS_TO_POLL, 500);
     
-    Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
-    try {
-      java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
-        metricsInst.getConstructor(new Class[] {TaskTracker.class} );
-      this.myInstrumentation = c.newInstance(this);
-    } catch(Exception e) {
-      //Reflection can throw lots of exceptions -- handle them all by 
-      //falling back on the default.
-      LOG.error("failed to initialize taskTracker metrics", e);
-      this.myInstrumentation = new TaskTrackerMetricsInst(this);
-    }
+    // Set up TaskTracker instrumentation
+    this.myInstrumentation = createInstrumentation(this, fConf);
     
     // bind address
     InetSocketAddress socAddr = NetUtils.createSocketAddr(
@@ -753,10 +744,8 @@ public class TaskTracker 
         MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
   }
 
-  public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
-    Configuration conf) {
-    return conf.getClass(TT_INSTRUMENTATION,
-        TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
+  public static Class<?>[] getInstrumentationClasses(Configuration conf) {
+    return conf.getClasses(TT_INSTRUMENTATION, TaskTrackerMetricsInst.class);
   }
 
   public static void setInstrumentationClass(
@@ -765,6 +754,41 @@ public class TaskTracker 
         t, TaskTrackerInstrumentation.class);
   }
   
+  public static TaskTrackerInstrumentation createInstrumentation(
+      TaskTracker tt, Configuration conf) {
+    try {
+      Class<?>[] instrumentationClasses = getInstrumentationClasses(conf);
+      if (instrumentationClasses.length == 0) {
+        LOG.error("Empty string given for " + TT_INSTRUMENTATION + 
+            " property -- will use default instrumentation class instead");
+        return new TaskTrackerMetricsInst(tt);
+      } else if (instrumentationClasses.length == 1) {
+        // Just one instrumentation class given; create it directly
+        Class<?> cls = instrumentationClasses[0];
+        java.lang.reflect.Constructor<?> c =
+          cls.getConstructor(new Class[] {TaskTracker.class} );
+        return (TaskTrackerInstrumentation) c.newInstance(tt);
+      } else {
+        // Multiple instrumentation classes given; use a composite object
+        List<TaskTrackerInstrumentation> instrumentations =
+          new ArrayList<TaskTrackerInstrumentation>();
+        for (Class<?> cls: instrumentationClasses) {
+          java.lang.reflect.Constructor<?> c =
+            cls.getConstructor(new Class[] {TaskTracker.class} );
+          TaskTrackerInstrumentation inst =
+            (TaskTrackerInstrumentation) c.newInstance(tt);
+          instrumentations.add(inst);
+        }
+        return new CompositeTaskTrackerInstrumentation(tt, instrumentations);
+      }
+    } catch(Exception e) {
+      // Reflection can throw lots of exceptions -- handle them all by 
+      // falling back on the default.
+      LOG.error("Failed to initialize TaskTracker metrics", e);
+      return new TaskTrackerMetricsInst(tt);
+    }
+  }
+
   /**
    * Removes all contents of temporary storage.  Called upon
    * startup, to remove any leftovers from previous run.
@@ -2647,7 +2671,7 @@ public class TaskTracker 
       runner.signalDone();
       LOG.info("Task " + task.getTaskID() + " is done.");
       LOG.info("reported output size for " + task.getTaskID() +  "  was " + taskStatus.getOutputSize());
-
+      myInstrumentation.statusUpdate(task, taskStatus);
     }
     
     public boolean wasKilled() {
@@ -2951,6 +2975,7 @@ public class TaskTracker 
       taskStatus.setFinishTime(System.currentTimeMillis());
       removeFromMemoryManager(task.getTaskID());
       releaseSlot();
+      myInstrumentation.statusUpdate(task, taskStatus);
       notifyTTAboutTaskCompletion();
     }
     
@@ -2983,6 +3008,7 @@ public class TaskTracker 
                              failure);
         runningTasks.put(task.getTaskID(), this);
         mapTotal++;
+        myInstrumentation.statusUpdate(task, taskStatus);
       } else {
         LOG.warn("Output already reported lost:"+task.getTaskID());
       }
@@ -3149,6 +3175,7 @@ public class TaskTracker 
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
       tip.reportProgress(taskStatus);
+      myInstrumentation.statusUpdate(tip.getTask(), taskStatus);
       return true;
     } else {
       LOG.warn("Progress from unknown child task: "+taskid);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java?rev=987589&r1=987588&r2=987589&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java Fri Aug 20 17:45:44 2010
@@ -62,4 +62,10 @@ class TaskTrackerInstrumentation  {
    */
   public void reportTaskEnd(TaskAttemptID t) {}
    
+  /**
+   * Called when a task changes status. 
+   * @param task the task whose status changed
+   * @param taskStatus the new status of the task
+   */
+  public void statusUpdate(Task task, TaskStatus taskStatus) {}
 }