You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/06/06 08:14:55 UTC
svn commit: r544740 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/TaskRunner.java
src/java/org/apache/hadoop/mapred/TaskTracker.java
Author: omalley
Date: Tue Jun 5 23:14:54 2007
New Revision: 544740
URL: http://svn.apache.org/viewvc?view=rev&rev=544740
Log:
HADOOP-1446. Update the TaskTracker metrics while the task is running.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=544740&r1=544739&r2=544740
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jun 5 23:14:54 2007
@@ -78,6 +78,9 @@
25. HADOOP-1461. Fix the synchronization of the task tracker to
avoid lockups in job cleanup. (Arun C Murthy via omalley)
+ 26. HADOOP-1446. Update the TaskTracker metrics while the task is
+ running. (Devaraj via omalley)
+
Release 0.13.0 - 2007-06-08
1. HADOOP-1047. Fix TestReplication to succeed more reliably.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=544740&r1=544739&r2=544740
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Jun 5 23:14:54 2007
@@ -282,7 +282,8 @@
// Add main class and its arguments
vargs.add(TaskTracker.Child.class.getName()); // main of Child
- vargs.add(tracker.taskReportPort + ""); // pass umbilical port
+ // pass umbilical port
+ vargs.add(tracker.getTaskTrackerReportPort() + "");
vargs.add(t.getTaskId()); // pass task identifier
// Run java
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=544740&r1=544739&r2=544740
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jun 5 23:14:54 2007
@@ -59,6 +59,7 @@
import org.apache.hadoop.metrics.MetricsException;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.ReflectionUtils;
@@ -92,7 +93,7 @@
InetSocketAddress jobTrackAddr;
String taskReportBindAddress;
- int taskReportPort;
+ private int taskReportPort;
Server taskReportServer = null;
InterTrackerProtocol jobClient;
@@ -120,8 +121,8 @@
*/
Map<String, TaskInProgress> runningTasks = null;
Map<String, RunningJob> runningJobs = null;
- int mapTotal = 0;
- int reduceTotal = 0;
+ volatile int mapTotal = 0;
+ volatile int reduceTotal = 0;
boolean justStarted = true;
//dir -> DF
@@ -156,25 +157,33 @@
*/
private int probe_sample_size = 50;
- private class TaskTrackerMetrics {
+ private class TaskTrackerMetrics implements Updater {
private MetricsRecord metricsRecord = null;
+ private int numCompletedTasks = 0;
TaskTrackerMetrics() {
MetricsContext context = MetricsUtil.getContext("mapred");
metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
+ context.registerUpdater(this);
}
synchronized void completeTask() {
- if (metricsRecord != null) {
- metricsRecord.incrMetric("tasks_completed", 1);
- }
+ ++numCompletedTasks;
}
-
- synchronized void update() {
- if (metricsRecord != null) {
- metricsRecord.setMetric("maps_running", mapTotal);
- metricsRecord.setMetric("reduces_running", reduceTotal);
- metricsRecord.update();
+ /**
+ * Since this object is a registered updater, this method will be called
+ * periodically, e.g. every 5 seconds.
+ */
+ public void doUpdates(MetricsContext unused) {
+ synchronized (this) {
+ if (metricsRecord != null) {
+ metricsRecord.setMetric("maps_running", mapTotal);
+ metricsRecord.setMetric("reduces_running", reduceTotal);
+ metricsRecord.setMetric("taskSlots", (short)maxCurrentTasks);
+ metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
+ metricsRecord.update();
+ }
+ numCompletedTasks = 0;
}
}
}
@@ -681,6 +690,11 @@
public FileSystem getFileSystem(){
return fs;
}
+
+ /** Return the port at which the tasktracker bound to */
+ public synchronized int getTaskTrackerReportPort() {
+ return taskReportPort;
+ }
/** Queries the job tracker for a set of outputs ready to be copied
* @param fromEventId the first event ID we want to start from, this is
@@ -769,8 +783,10 @@
String msg = "Exiting task tracker for disk error:\n" +
StringUtils.stringifyException(de);
LOG.error(msg);
- jobClient.reportTaskTrackerError(taskTrackerName,
- "DiskErrorException", msg);
+ synchronized (this) {
+ jobClient.reportTaskTrackerError(taskTrackerName,
+ "DiskErrorException", msg);
+ }
return State.STALE;
} catch (RemoteException re) {
String reClass = re.getClassName();
@@ -852,7 +868,6 @@
}
try {
myMetrics.completeTask();
- myMetrics.update();
} catch (MetricsException me) {
LOG.warn("Caught: " + StringUtils.stringifyException(me));
}
@@ -1081,7 +1096,6 @@
} else {
reduceTotal++;
}
- myMetrics.update();
}
try {
localizeJob(tip);
@@ -1461,7 +1475,6 @@
failure);
runningTasks.put(task.getTaskId(), this);
mapTotal++;
- myMetrics.update();
} else {
LOG.warn("Output already reported lost:"+task.getTaskId());
}