You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2010/08/20 19:45:45 UTC
svn commit: r987589 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/TaskTracker.java
src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
Author: acmurthy
Date: Fri Aug 20 17:45:44 2010
New Revision: 987589
URL: http://svn.apache.org/viewvc?rev=987589&view=rev
Log:
MAPREDUCE-1881. Improve TaskTrackerInstrumentation to enable collection of advanced metrics. Contributed by Matei Zaharia.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=987589&r1=987588&r2=987589&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug 20 17:45:44 2010
@@ -121,6 +121,9 @@ Trunk (unreleased changes)
MAPREDUCE-1920. Enables completed jobstatus store by default. (Tom White
via amareshwari)
+ MAPREDUCE-1881. Improve TaskTrackerInstrumentation to enable collection of
+ advanced metrics. (Matei Zaharia via acmurthy)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=987589&r1=987588&r2=987589&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Aug 20 17:45:44 2010
@@ -625,17 +625,8 @@ public class TaskTracker
probe_sample_size =
this.fConf.getInt(TT_MAX_TASK_COMPLETION_EVENTS_TO_POLL, 500);
- Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
- try {
- java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
- metricsInst.getConstructor(new Class[] {TaskTracker.class} );
- this.myInstrumentation = c.newInstance(this);
- } catch(Exception e) {
- //Reflection can throw lots of exceptions -- handle them all by
- //falling back on the default.
- LOG.error("failed to initialize taskTracker metrics", e);
- this.myInstrumentation = new TaskTrackerMetricsInst(this);
- }
+ // Set up TaskTracker instrumentation
+ this.myInstrumentation = createInstrumentation(this, fConf);
// bind address
InetSocketAddress socAddr = NetUtils.createSocketAddr(
@@ -753,10 +744,8 @@ public class TaskTracker
MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
}
- public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
- Configuration conf) {
- return conf.getClass(TT_INSTRUMENTATION,
- TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
+ public static Class<?>[] getInstrumentationClasses(Configuration conf) {
+ return conf.getClasses(TT_INSTRUMENTATION, TaskTrackerMetricsInst.class);
}
public static void setInstrumentationClass(
@@ -765,6 +754,41 @@ public class TaskTracker
t, TaskTrackerInstrumentation.class);
}
+ public static TaskTrackerInstrumentation createInstrumentation(
+ TaskTracker tt, Configuration conf) {
+ try {
+ Class<?>[] instrumentationClasses = getInstrumentationClasses(conf);
+ if (instrumentationClasses.length == 0) {
+ LOG.error("Empty string given for " + TT_INSTRUMENTATION +
+ " property -- will use default instrumentation class instead");
+ return new TaskTrackerMetricsInst(tt);
+ } else if (instrumentationClasses.length == 1) {
+ // Just one instrumentation class given; create it directly
+ Class<?> cls = instrumentationClasses[0];
+ java.lang.reflect.Constructor<?> c =
+ cls.getConstructor(new Class[] {TaskTracker.class} );
+ return (TaskTrackerInstrumentation) c.newInstance(tt);
+ } else {
+ // Multiple instrumentation classes given; use a composite object
+ List<TaskTrackerInstrumentation> instrumentations =
+ new ArrayList<TaskTrackerInstrumentation>();
+ for (Class<?> cls: instrumentationClasses) {
+ java.lang.reflect.Constructor<?> c =
+ cls.getConstructor(new Class[] {TaskTracker.class} );
+ TaskTrackerInstrumentation inst =
+ (TaskTrackerInstrumentation) c.newInstance(tt);
+ instrumentations.add(inst);
+ }
+ return new CompositeTaskTrackerInstrumentation(tt, instrumentations);
+ }
+ } catch(Exception e) {
+ // Reflection can throw lots of exceptions -- handle them all by
+ // falling back on the default.
+ LOG.error("Failed to initialize TaskTracker metrics", e);
+ return new TaskTrackerMetricsInst(tt);
+ }
+ }
+
/**
* Removes all contents of temporary storage. Called upon
* startup, to remove any leftovers from previous run.
@@ -2647,7 +2671,7 @@ public class TaskTracker
runner.signalDone();
LOG.info("Task " + task.getTaskID() + " is done.");
LOG.info("reported output size for " + task.getTaskID() + " was " + taskStatus.getOutputSize());
-
+ myInstrumentation.statusUpdate(task, taskStatus);
}
public boolean wasKilled() {
@@ -2951,6 +2975,7 @@ public class TaskTracker
taskStatus.setFinishTime(System.currentTimeMillis());
removeFromMemoryManager(task.getTaskID());
releaseSlot();
+ myInstrumentation.statusUpdate(task, taskStatus);
notifyTTAboutTaskCompletion();
}
@@ -2983,6 +3008,7 @@ public class TaskTracker
failure);
runningTasks.put(task.getTaskID(), this);
mapTotal++;
+ myInstrumentation.statusUpdate(task, taskStatus);
} else {
LOG.warn("Output already reported lost:"+task.getTaskID());
}
@@ -3149,6 +3175,7 @@ public class TaskTracker
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
tip.reportProgress(taskStatus);
+ myInstrumentation.statusUpdate(tip.getTask(), taskStatus);
return true;
} else {
LOG.warn("Progress from unknown child task: "+taskid);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java?rev=987589&r1=987588&r2=987589&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java Fri Aug 20 17:45:44 2010
@@ -62,4 +62,10 @@ class TaskTrackerInstrumentation {
*/
public void reportTaskEnd(TaskAttemptID t) {}
+ /**
+ * Called when a task changes status.
+ * @param task the task whose status changed
+ * @param taskStatus the new status of the task
+ */
+ public void statusUpdate(Task task, TaskStatus taskStatus) {}
}