You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/03/06 23:11:05 UTC

svn commit: r383690 - /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Author: cutting
Date: Mon Mar  6 14:11:03 2006
New Revision: 383690

URL: http://svn.apache.org/viewcvs?rev=383690&view=rev
Log:
Reduce iteration through all map & reduce tasks to improve jobtracker performance.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=383690&r1=383689&r2=383690&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Mar  6 14:11:03 2006
@@ -245,30 +245,27 @@
     // Status update methods
     ////////////////////////////////////////////////////
     public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
-        tip.updateStatus(status);
+        double oldProgress = tip.getProgress();   // save old progress
+        tip.updateStatus(status);                 // update tip
 
         //
         // Update JobInProgress status
         //
-        if (maps.length == 0) {
+        double progressDelta = tip.getProgress() - oldProgress;
+        if (tip.isMapTask()) {
+          if (maps.length == 0) {
             this.status.setMapProgress(1.0f);
+          } else {
+            this.status.mapProgress += (progressDelta / maps.length);
+          }
         } else {
-            double reportedProgress = 0;
-            for (int i = 0; i < maps.length; i++) {
-                reportedProgress += maps[i].getProgress();
-            }
-            this.status.setMapProgress((float) (reportedProgress / maps.length));
-        }
-        if (reduces.length == 0) {
+          if (reduces.length == 0) {
             this.status.setReduceProgress(1.0f);
-        } else {
-            double reportedProgress = 0;
-            for (int i = 0; i < reduces.length; i++) {
-                reportedProgress += reduces[i].getProgress();
-            }
-            this.status.setReduceProgress((float) (reportedProgress / reduces.length));
+          } else {
+            this.status.reduceProgress += (progressDelta / reduces.length);
+          }
         }
-    }
+    }   
 
     /////////////////////////////////////////////////////
     // Create/manage tasks
@@ -286,7 +283,6 @@
         int cacheTarget = -1;
         int stdTarget = -1;
         int specTarget = -1;
-        double totalProgress = 0;
 
         //
         // We end up creating two tasks for the same bucket, because
@@ -297,10 +293,7 @@
         //
         // Compute avg progress through the map tasks
         //
-        for (int i = 0; i < maps.length; i++) {        
-            totalProgress += maps[i].getProgress();
-        }
-        double avgProgress = totalProgress / maps.length;
+        double avgProgress = status.mapProgress() / maps.length;
 
         //
         // See if there is a split over a block that is stored on
@@ -373,11 +366,7 @@
         Task t = null;
         int stdTarget = -1;
         int specTarget = -1;
-        int totalProgress = 0;
-        for (int i = 0; i < reduces.length; i++) {
-            totalProgress += reduces[i].getProgress();
-        }
-        double avgProgress = (1.0 * totalProgress) / reduces.length;
+        double avgProgress = status.reduceProgress() / reduces.length;
 
         for (int i = 0; i < reduces.length; i++) {
             if (reduces[i].hasTask()) {