You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/06/06 08:14:55 UTC

svn commit: r544740 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskRunner.java src/java/org/apache/hadoop/mapred/TaskTracker.java

Author: omalley
Date: Tue Jun  5 23:14:54 2007
New Revision: 544740

URL: http://svn.apache.org/viewvc?view=rev&rev=544740
Log:
HADOOP-1446. Update the TaskTracker metrics while the task is running.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=544740&r1=544739&r2=544740
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jun  5 23:14:54 2007
@@ -78,6 +78,9 @@
  25. HADOOP-1461.  Fix the synchronization of the task tracker to
      avoid lockups in job cleanup.  (Arun C Murthy via omalley)
 
+ 26. HADOOP-1446.  Update the TaskTracker metrics while the task is
+     running. (Devaraj via omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=544740&r1=544739&r2=544740
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Jun  5 23:14:54 2007
@@ -282,7 +282,8 @@
 
         // Add main class and its arguments 
         vargs.add(TaskTracker.Child.class.getName());  // main of Child
-        vargs.add(tracker.taskReportPort + "");        // pass umbilical port
+        // pass umbilical port
+        vargs.add(tracker.getTaskTrackerReportPort() + ""); 
         vargs.add(t.getTaskId());                      // pass task identifier
 
         // Run java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=544740&r1=544739&r2=544740
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jun  5 23:14:54 2007
@@ -59,6 +59,7 @@
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -92,7 +93,7 @@
   InetSocketAddress jobTrackAddr;
     
   String taskReportBindAddress;
-  int taskReportPort;
+  private int taskReportPort;
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
@@ -120,8 +121,8 @@
    */
   Map<String, TaskInProgress> runningTasks = null;
   Map<String, RunningJob> runningJobs = null;
-  int mapTotal = 0;
-  int reduceTotal = 0;
+  volatile int mapTotal = 0;
+  volatile int reduceTotal = 0;
   boolean justStarted = true;
     
   //dir -> DF
@@ -156,25 +157,33 @@
    */  
   private int probe_sample_size = 50;
     
-  private class TaskTrackerMetrics {
+  private class TaskTrackerMetrics implements Updater {
     private MetricsRecord metricsRecord = null;
+    private int numCompletedTasks = 0;
       
     TaskTrackerMetrics() {
       MetricsContext context = MetricsUtil.getContext("mapred");
       metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
+      context.registerUpdater(this);
     }
       
     synchronized void completeTask() {
-      if (metricsRecord != null) {
-        metricsRecord.incrMetric("tasks_completed", 1);
-      }
+      ++numCompletedTasks;
     }
-      
-    synchronized void update() {
-      if (metricsRecord != null) {
-        metricsRecord.setMetric("maps_running", mapTotal);
-        metricsRecord.setMetric("reduces_running", reduceTotal);
-        metricsRecord.update();
+    /**
+     * Since this object is a registered updater, this method will be called
+     * periodically, e.g. every 5 seconds.
+     */  
+    public void doUpdates(MetricsContext unused) {
+      synchronized (this) {
+        if (metricsRecord != null) {
+          metricsRecord.setMetric("maps_running", mapTotal);
+          metricsRecord.setMetric("reduces_running", reduceTotal);
+          metricsRecord.setMetric("taskSlots", (short)maxCurrentTasks);
+          metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
+          metricsRecord.update();
+        }
+        numCompletedTasks = 0;
       }
     }
   }
@@ -681,6 +690,11 @@
   public FileSystem getFileSystem(){
     return fs;
   }
+  
+  /** Return the port at which the tasktracker bound to */
+  public synchronized int getTaskTrackerReportPort() {
+    return taskReportPort;
+  }
     
   /** Queries the job tracker for a set of outputs ready to be copied
    * @param fromEventId the first event ID we want to start from, this is
@@ -769,8 +783,10 @@
         String msg = "Exiting task tracker for disk error:\n" +
           StringUtils.stringifyException(de);
         LOG.error(msg);
-        jobClient.reportTaskTrackerError(taskTrackerName, 
-                                         "DiskErrorException", msg);
+        synchronized (this) {
+          jobClient.reportTaskTrackerError(taskTrackerName, 
+                                           "DiskErrorException", msg);
+        }
         return State.STALE;
       } catch (RemoteException re) {
         String reClass = re.getClassName();
@@ -852,7 +868,6 @@
           }
           try {
             myMetrics.completeTask();
-            myMetrics.update();
           } catch (MetricsException me) {
             LOG.warn("Caught: " + StringUtils.stringifyException(me));
           }
@@ -1081,7 +1096,6 @@
       } else {
         reduceTotal++;
       }
-      myMetrics.update();
     }
     try {
       localizeJob(tip);
@@ -1461,7 +1475,6 @@
                              failure);
         runningTasks.put(task.getTaskId(), this);
         mapTotal++;
-        myMetrics.update();
       } else {
         LOG.warn("Output already reported lost:"+task.getTaskId());
       }