You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/08/02 00:57:57 UTC
svn commit: r681888 - in /hadoop/core/trunk: ./ conf/
src/mapred/org/apache/hadoop/mapred/
Author: acmurthy
Date: Fri Aug 1 15:57:57 2008
New Revision: 681888
URL: http://svn.apache.org/viewvc?rev=681888&view=rev
Log:
HADOOP-3772. Add a new Hadoop Instrumentation api for the JobTracker and the TaskTracker, refactor Hadoop Metrics as an implementation of the api. Contributed by Ari Rabkin.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 1 15:57:57 2008
@@ -60,6 +60,10 @@
HADOOP-3730. Adds a new JobConf constructor that disables loading
default configurations. (Alejandro Abdelnur via ddas)
+ HADOOP-3772. Add a new Hadoop Instrumentation api for the JobTracker and
+ the TaskTracker, refactor Hadoop Metrics as an implementation of the api.
+ (Ari Rabkin via acmurthy)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Aug 1 15:57:57 2008
@@ -676,6 +676,13 @@
</property>
<property>
+ <name>mapred.tasktracker.instrumentation</name>
+ <value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value>
+ <description>Expert: The instrumentation class to associate with each TaskTracker.
+ </description>
+</property>
+
+<property>
<name>mapred.map.tasks</name>
<value>2</value>
<description>The default number of map tasks per job. Typically set
@@ -774,6 +781,13 @@
</property>
<property>
+ <name>mapred.job.instrumentation</name>
+ <value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value>
+ <description>Expert: The instrumentation class to associate with each TaskTracker.
+ </description>
+</property>
+
+<property>
<name>mapred.child.java.opts</name>
<value>-Xmx200m</value>
<description>Java opts for the task tracker child processes.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Aug 1 15:57:57 2008
@@ -37,7 +37,6 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -475,7 +474,7 @@
////////////////////////////////////////////////////
public synchronized void updateTaskStatus(TaskInProgress tip,
TaskStatus status,
- JobTrackerMetrics metrics) {
+ JobTrackerInstrumentation metrics) {
double oldProgress = tip.getProgress(); // save old progress
boolean wasRunning = tip.isRunning();
@@ -1306,7 +1305,7 @@
*/
public synchronized boolean completedTask(TaskInProgress tip,
TaskStatus status,
- JobTrackerMetrics metrics)
+ JobTrackerInstrumentation metrics)
{
TaskAttemptID taskid = status.getTaskID();
@@ -1359,13 +1358,13 @@
if (tip.isMapTask()){
runningMapTasks -= 1;
finishedMapTasks += 1;
- metrics.completeMap();
+ metrics.completeMap(taskid);
// remove the completed map from the resp running caches
retireMap(tip);
} else{
runningReduceTasks -= 1;
finishedReduceTasks += 1;
- metrics.completeReduce();
+ metrics.completeReduce(taskid);
// remove the completed reduces from the running reducers set
retireReduce(tip);
}
@@ -1393,7 +1392,7 @@
* @param metrics job-tracker metrics
* @return
*/
- private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
+ private boolean isJobComplete(TaskInProgress tip, JobTrackerInstrumentation metrics) {
// Job is complete if total-tips = finished-tips + failed-tips
boolean allDone =
((finishedMapTasks + failedMapTIPs) == numMapTasks);
@@ -1467,7 +1466,7 @@
TaskStatus status,
TaskTrackerStatus taskTrackerStatus,
boolean wasRunning, boolean wasComplete,
- JobTrackerMetrics metrics) {
+ JobTrackerInstrumentation metrics) {
// check if the TIP is already failed
boolean wasFailed = tip.isFailed();
@@ -1611,7 +1610,7 @@
*/
public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason,
TaskStatus.Phase phase, TaskStatus.State state,
- String trackerName, JobTrackerMetrics metrics) {
+ String trackerName, JobTrackerInstrumentation metrics) {
TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(),
taskid,
0.0f,
@@ -1733,7 +1732,7 @@
synchronized void fetchFailureNotification(TaskInProgress tip,
TaskAttemptID mapTaskId,
String trackerName,
- JobTrackerMetrics metrics) {
+ JobTrackerInstrumentation metrics) {
Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
@@ -1763,9 +1762,9 @@
private JobInProgress job;
private TaskInProgress tip;
private TaskAttemptID taskId;
- private JobTrackerMetrics metrics;
+ private JobTrackerInstrumentation metrics;
JobWithTaskContext(JobInProgress job, TaskInProgress tip,
- TaskAttemptID taskId, JobTrackerMetrics metrics) {
+ TaskAttemptID taskId, JobTrackerInstrumentation metrics) {
this.job = job;
this.tip = tip;
this.taskId = taskId;
@@ -1780,7 +1779,7 @@
TaskAttemptID getTaskID() {
return taskId;
}
- JobTrackerMetrics getJobTrackerMetrics() {
+ JobTrackerInstrumentation getJobTrackerMetrics() {
return metrics;
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Aug 1 15:57:57 2008
@@ -218,7 +218,7 @@
tip.isMapTask()? TaskStatus.Phase.MAP:
TaskStatus.Phase.STARTING,
TaskStatus.State.FAILED,
- trackerName, myMetrics);
+ trackerName, myInstrumentation);
}
itr.remove();
} else {
@@ -393,83 +393,9 @@
}
}
- static class JobTrackerMetrics implements Updater {
- private MetricsRecord metricsRecord = null;
- private int numMapTasksLaunched = 0;
- private int numMapTasksCompleted = 0;
- private int numReduceTasksLaunched = 0;
- private int numReduceTasksCompleted = 0;
- private int numJobsSubmitted = 0;
- private int numJobsCompleted = 0;
- private JobTracker tracker;
-
- JobTrackerMetrics(JobTracker tracker, JobConf conf) {
- String sessionId = conf.getSessionId();
- // Initiate JVM Metrics
- JvmMetrics.init("JobTracker", sessionId);
- // Create a record for map-reduce metrics
- MetricsContext context = MetricsUtil.getContext("mapred");
- metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
- metricsRecord.setTag("sessionId", sessionId);
- this.tracker = tracker;
- context.registerUpdater(this);
- }
-
- /**
- * Since this object is a registered updater, this method will be called
- * periodically, e.g. every 5 seconds.
- */
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
- metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
- metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
- metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
- metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
- metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
-
- numMapTasksLaunched = 0;
- numMapTasksCompleted = 0;
- numReduceTasksLaunched = 0;
- numReduceTasksCompleted = 0;
- numJobsSubmitted = 0;
- numJobsCompleted = 0;
- }
- metricsRecord.update();
-
- if (tracker != null) {
- for (JobInProgress jip : tracker.getRunningJobs()) {
- jip.updateMetrics();
- }
- }
- }
+
- synchronized void launchMap() {
- ++numMapTasksLaunched;
- }
-
- synchronized void completeMap() {
- ++numMapTasksCompleted;
- }
-
- synchronized void launchReduce() {
- ++numReduceTasksLaunched;
- }
-
- synchronized void completeReduce() {
- ++numReduceTasksCompleted;
- }
-
- synchronized void submitJob() {
- ++numJobsSubmitted;
- }
-
- synchronized void completeJob() {
- ++numJobsCompleted;
- }
- }
-
- private JobTrackerMetrics myMetrics = null;
+ private JobTrackerInstrumentation myInstrumentation = null;
/////////////////////////////////////////////////////////////////
// The real JobTracker
@@ -661,7 +587,18 @@
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
trackerIdentifier = dateFormat.format(new Date());
- myMetrics = new JobTrackerMetrics(this, jobConf);
+ Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
+ try {
+ java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+ this.myInstrumentation = c.newInstance(this, jobConf);
+ } catch(Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize job tracker metrics", e);
+ this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+ }
+
// The rpc/web-server ports can be ephemeral ports...
// ... ensure we have the correct info
@@ -728,6 +665,16 @@
LOG.info("Starting RUNNING");
}
+ public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
+ return conf.getClass("mapred.jobtracker.instrumentation",
+ JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
+ }
+
+ public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
+ conf.setClass("mapred.jobtracker.instrumentation",
+ t, JobTrackerInstrumentation.class);
+ }
+
public static InetSocketAddress getAddress(Configuration conf) {
String jobTrackerStr =
conf.get("mapred.job.tracker", "localhost:8012");
@@ -1290,9 +1237,9 @@
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if (task.isMapTask()) {
- myMetrics.launchMap();
+ myInstrumentation.launchMap(task.getTaskID());
} else {
- myMetrics.launchReduce();
+ myInstrumentation.launchReduce(task.getTaskID());
}
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
actions.add(new LaunchTaskAction(task));
@@ -1620,7 +1567,7 @@
}
}
}
- myMetrics.submitJob();
+ myInstrumentation.submitJob();
return job.getStatus();
}
@@ -1880,7 +1827,7 @@
LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskID());
} else {
expireLaunchingTasks.removeTask(taskId);
- tip.getJob().updateTaskStatus(tip, report, myMetrics);
+ tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
}
// Process 'failed fetch' notifications
@@ -1898,7 +1845,7 @@
failedFetchMap.getJob().fetchFailureNotification(failedFetchMap,
mapTaskId,
failedFetchTrackerName,
- myMetrics);
+ myInstrumentation);
}
}
}
@@ -1934,7 +1881,7 @@
(tip.isMapTask() ?
TaskStatus.Phase.MAP :
TaskStatus.Phase.REDUCE),
- TaskStatus.State.KILLED, trackerName, myMetrics);
+ TaskStatus.State.KILLED, trackerName, myInstrumentation);
jobsWithFailures.add(job);
}
} else {
@@ -2006,7 +1953,7 @@
JobInProgress[] jobs = new JobInProgress[jobList.size()];
TaskInProgress[] tips = new TaskInProgress[jobList.size()];
TaskAttemptID[] taskids = new TaskAttemptID[jobList.size()];
- JobTrackerMetrics[] metrics = new JobTrackerMetrics[jobList.size()];
+ JobTrackerInstrumentation[] metrics = new JobTrackerInstrumentation[jobList.size()];
Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
int count = 0;
@@ -2169,4 +2116,6 @@
}
}
+
+
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Aug 1 15:57:57 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.Updater;
+
+public class JobTrackerInstrumentation {
+
+ protected final JobTracker tracker;
+
+ public JobTrackerInstrumentation(JobTracker jt, JobConf conf) {
+ tracker = jt;
+ }
+
+ public void launchMap(TaskAttemptID taskAttemptID)
+ { }
+
+ public void completeMap(TaskAttemptID taskAttemptID)
+ { }
+
+ public void launchReduce(TaskAttemptID taskAttemptID)
+ { }
+
+ public void completeReduce(TaskAttemptID taskAttemptID)
+ { }
+
+ public void submitJob()
+ { }
+
+ public void completeJob()
+ { }
+
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Fri Aug 1 15:57:57 2008
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
+ private MetricsRecord metricsRecord = null;
+ int numMapTasksLaunched = 0;
+ int numMapTasksCompleted = 0;
+ int numReduceTasksLaunched = 0;
+ int numReduceTasksCompleted = 0;
+ private int numJobsSubmitted = 0;
+ private int numJobsCompleted = 0;
+
+ public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
+ super(tracker, conf);
+ String sessionId = conf.getSessionId();
+ // Initiate JVM Metrics
+ JvmMetrics.init("JobTracker", sessionId);
+ // Create a record for map-reduce metrics
+ MetricsContext context = MetricsUtil.getContext("mapred");
+ metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+ metricsRecord.setTag("sessionId", sessionId);
+ context.registerUpdater(this);
+ }
+
+ /**
+ * Since this object is a registered updater, this method will be called
+ * periodically, e.g. every 5 seconds.
+ */
+ public void doUpdates(MetricsContext unused) {
+ synchronized (this) {
+ metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+ metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+ metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+ metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+ metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
+ metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
+
+ numMapTasksLaunched = 0;
+ numMapTasksCompleted = 0;
+ numReduceTasksLaunched = 0;
+ numReduceTasksCompleted = 0;
+ numJobsSubmitted = 0;
+ numJobsCompleted = 0;
+ }
+ metricsRecord.update();
+
+ if (tracker != null) {
+ for (JobInProgress jip : tracker.getRunningJobs()) {
+ jip.updateMetrics();
+ }
+ }
+ }
+
+ public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksLaunched;
+ }
+
+ public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksCompleted;
+ }
+
+ public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksLaunched;
+ }
+
+ public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksCompleted;
+ }
+
+ public synchronized void submitJob() {
+ ++numJobsSubmitted;
+ }
+
+ public synchronized void completeJob() {
+ ++numJobsCompleted;
+ }
+}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Aug 1 15:57:57 2008
@@ -29,7 +29,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
+import org.apache.hadoop.mapred.JobTrackerMetricsInst;
/** Implements MapReduce locally, in-process, for debugging. */
class LocalJobRunner implements JobSubmissionProtocol {
@@ -42,7 +42,7 @@
private int map_tasks = 0;
private int reduce_tasks = 0;
- private JobTrackerMetrics myMetrics = null;
+ private JobTrackerInstrumentation myMetrics = null;
private static final String jobDir = "localRunner/";
@@ -153,10 +153,10 @@
map.localizeConfiguration(localConf);
map.setConf(localConf);
map_tasks += 1;
- myMetrics.launchMap();
+ myMetrics.launchMap(mapId);
map.run(localConf, this);
map.saveTaskOutput();
- myMetrics.completeMap();
+ myMetrics.completeMap(mapId);
map_tasks -= 1;
updateCounters(map);
}
@@ -197,10 +197,10 @@
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
reduce_tasks += 1;
- myMetrics.launchReduce();
+ myMetrics.launchReduce(reduce.getTaskID());
reduce.run(localConf, this);
reduce.saveTaskOutput();
- myMetrics.completeReduce();
+ myMetrics.completeReduce(reduce.getTaskID());
reduce_tasks -= 1;
updateCounters(reduce);
}
@@ -310,7 +310,7 @@
public LocalJobRunner(JobConf conf) throws IOException {
this.fs = FileSystem.get(conf);
this.conf = conf;
- myMetrics = new JobTrackerMetrics(null, new JobConf(conf));
+ myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
}
// JobSubmissionProtocol methods
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Aug 1 15:57:57 2008
@@ -387,6 +387,7 @@
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
stdout.getParentFile().mkdirs();
+ tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
List<String> wrappedCommand =
TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize);
Map<String, String> env = new HashMap<String, String>();
@@ -454,10 +455,10 @@
// error and output are appropriately redirected
} finally { // handle the exit code
int exit_code = shexec.getExitCode();
-
+ tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
if (!killed && exit_code != 0) {
if (exit_code == 65) {
- tracker.getTaskTrackerMetrics().taskFailedPing();
+ tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
}
throw new IOException("Task process exit with nonzero status of " +
exit_code + ".");
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=681888&r1=681887&r2=681888&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Aug 1 15:57:57 2008
@@ -24,6 +24,7 @@
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.RandomAccessFile;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -49,6 +50,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -230,64 +232,15 @@
shuffleMetricsRecord.update();
}
}
- public class TaskTrackerMetrics implements Updater {
- private MetricsRecord metricsRecord = null;
- private int numCompletedTasks = 0;
- private int timedoutTasks = 0;
- private int tasksFailedPing = 0;
-
- TaskTrackerMetrics() {
- JobConf conf = getJobConf();
- String sessionId = conf.getSessionId();
- // Initiate Java VM Metrics
- JvmMetrics.init("TaskTracker", sessionId);
- // Create a record for Task Tracker metrics
- MetricsContext context = MetricsUtil.getContext("mapred");
- metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
- metricsRecord.setTag("sessionId", sessionId);
- context.registerUpdater(this);
- }
-
- synchronized void completeTask() {
- ++numCompletedTasks;
- }
-
- synchronized void timedoutTask() {
- ++timedoutTasks;
- }
-
- synchronized void taskFailedPing() {
- ++tasksFailedPing;
- }
-
- /**
- * Since this object is a registered updater, this method will be called
- * periodically, e.g. every 5 seconds.
- */
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- if (metricsRecord != null) {
- metricsRecord.setMetric("maps_running", mapTotal);
- metricsRecord.setMetric("reduces_running", reduceTotal);
- metricsRecord.setMetric("mapTaskSlots", (short)maxCurrentMapTasks);
- metricsRecord.setMetric("reduceTaskSlots",
- (short)maxCurrentReduceTasks);
- metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
- metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
- metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
- }
- numCompletedTasks = 0;
- timedoutTasks = 0;
- tasksFailedPing = 0;
- }
- metricsRecord.update();
- }
- }
+
+
+
+
- private TaskTrackerMetrics myMetrics = null;
+ private TaskTrackerInstrumentation myInstrumentation = null;
- public TaskTrackerMetrics getTaskTrackerMetrics() {
- return myMetrics;
+ public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
+ return myInstrumentation;
}
/**
@@ -416,8 +369,17 @@
//tweak the probe sample size (make it a function of numCopiers)
probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
-
- this.myMetrics = new TaskTrackerMetrics();
+ Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
+ try {
+ java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class[] {TaskTracker.class} );
+ this.myInstrumentation = c.newInstance(this);
+ } catch(Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize taskTracker metrics", e);
+ this.myInstrumentation = new TaskTrackerMetricsInst(this);
+ }
// bind address
String address =
@@ -464,6 +426,16 @@
mapEventsFetcher.start();
}
+ public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
+ return conf.getClass("mapred.tasktracker.instrumentation",
+ TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
+ }
+
+ public static void setInstrumentationClass(Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
+ conf.setClass("mapred.tasktracker.instrumentation",
+ t, TaskTrackerInstrumentation.class);
+ }
+
/**
* Removes all contents of temporary storage. Called upon
* startup, to remove any leftovers from previous run.
@@ -1072,7 +1044,7 @@
reduceTotal--;
}
try {
- myMetrics.completeTask();
+ myInstrumentation.completeTask(taskStatus.getTaskID());
} catch (MetricsException me) {
LOG.warn("Caught: " + StringUtils.stringifyException(me));
}
@@ -1137,7 +1109,7 @@
LOG.info(tip.getTask().getTaskID() + ": " + msg);
ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
tip.reportDiagnosticInfo(msg);
- myMetrics.timedoutTask();
+ myInstrumentation.timedoutTask(tip.getTask().getTaskID());
purgeTask(tip, true);
}
}
@@ -2530,4 +2502,13 @@
}
}
}
+
+ int getMaxCurrentMapTasks() {
+ return maxCurrentMapTasks;
+ }
+
+ int getMaxCurrentReduceTasks() {
+ return maxCurrentReduceTasks;
+ }
+
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java Fri Aug 1 15:57:57 2008
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+
+/**
+ * TaskTrackerInstrumentation defines a number of instrumentation points
+ * associated with TaskTrackers. By default, the instrumentation points do
+ * nothing, but subclasses can do arbitrary instrumentation and monitoring at
+ * these points.
+ *
+ * TaskTrackerInstrumentation interfaces are associated uniquely with a
+ * TaskTracker. We don't want an inner class here, because then subclasses
+ * wouldn't have direct access to the associated TaskTracker.
+ *
+ **/
+public class TaskTrackerInstrumentation {
+
+ protected final TaskTracker tt;
+
+ public TaskTrackerInstrumentation(TaskTracker t) {
+ tt = t;
+ }
+
+ /**
+ * invoked when task attempt t succeeds
+ * @param t
+ */
+ public void completeTask(TaskAttemptID t) { }
+
+ public void timedoutTask(TaskAttemptID t) { }
+
+ public void taskFailedPing(TaskAttemptID t) { }
+
+ /**
+ * Called just before task attempt t starts.
+ * @param stdout the file containing standard out of the new task
+ * @param stderr the file containing standard error of the new task
+ */
+ public void reportTaskLaunch(TaskAttemptID t, File stdout, File stderr) { }
+
+ /**
+ * called when task t has just finished.
+ * @param t
+ */
+ public void reportTaskEnd(TaskAttemptID t) {}
+
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java?rev=681888&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java Fri Aug 1 15:57:57 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+public class TaskTrackerMetricsInst extends TaskTrackerInstrumentation implements Updater {
+ private final MetricsRecord metricsRecord;
+ private int numCompletedTasks = 0;
+ private int timedoutTasks = 0;
+ private int tasksFailedPing = 0;
+
+ public TaskTrackerMetricsInst(TaskTracker t) {
+ super(t);
+ JobConf conf = tt.getJobConf();
+ String sessionId = conf.getSessionId();
+ // Initiate Java VM Metrics
+ JvmMetrics.init("TaskTracker", sessionId);
+ // Create a record for Task Tracker metrics
+ MetricsContext context = MetricsUtil.getContext("mapred");
+ metricsRecord = MetricsUtil.createRecord(context, "tasktracker"); //guaranteed never null
+ metricsRecord.setTag("sessionId", sessionId);
+ context.registerUpdater(this);
+ }
+
+ synchronized void completeTask() {
+ ++numCompletedTasks;
+ }
+
+ synchronized void timedoutTask() {
+ ++timedoutTasks;
+ }
+
+ synchronized void taskFailedPing() {
+ ++tasksFailedPing;
+ }
+
+ /**
+ * Since this object is a registered updater, this method will be called
+ * periodically, e.g. every 5 seconds.
+ */
+ public void doUpdates(MetricsContext unused) {
+ synchronized (this) {
+ metricsRecord.setMetric("maps_running", tt.mapTotal);
+ metricsRecord.setMetric("reduces_running", tt.reduceTotal);
+ metricsRecord.setMetric("mapTaskSlots", (short)tt.getMaxCurrentMapTasks());
+ metricsRecord.setMetric("reduceTaskSlots",
+ (short)tt.getMaxCurrentReduceTasks());
+ metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
+ metricsRecord.incrMetric("tasks_failed_timeout", timedoutTasks);
+ metricsRecord.incrMetric("tasks_failed_ping", tasksFailedPing);
+
+ numCompletedTasks = 0;
+ timedoutTasks = 0;
+ tasksFailedPing = 0;
+ }
+ metricsRecord.update();
+ }
+
+
+}
\ No newline at end of file