You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/10/28 14:32:26 UTC

svn commit: r830531 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: sharad
Date: Wed Oct 28 13:32:26 2009
New Revision: 830531

URL: http://svn.apache.org/viewvc?rev=830531&view=rev
Log:
MAPREDUCE-1158. Fix JT running maps and running reduces metrics. Contributed by Sharad Agarwal.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=830531&r1=830530&r2=830531&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Oct 28 13:32:26 2009
@@ -48,6 +48,9 @@
     MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while holding a
     global lock. (Amareshwari Sriramadasu via acmurthy)
 
+    MAPREDUCE-1158. Fix JT running maps and running reduces metrics.
+    (sharad)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=830531&r1=830530&r2=830531&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 28 13:32:26 2009
@@ -1541,7 +1541,6 @@
       name = TaskType.JOB_CLEANUP;
     } else if (tip.isMapTask()) {
       ++runningMapTasks;
-      metrics.addRunningMaps(jobId, 1);
       name = TaskType.MAP;
       counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
@@ -1553,7 +1552,6 @@
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
-      metrics.addRunningReduces(jobId, 1);
       name = TaskType.REDUCE;
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
       if (tip.isSpeculating()) {
@@ -2604,7 +2602,6 @@
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
-      metrics.decRunningMaps(jobId, 1);
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
       if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
@@ -2617,7 +2614,6 @@
       }
     } else {
       runningReduceTasks -= 1;
-      metrics.decRunningReduces(jobId, 1);
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
       if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
@@ -2966,7 +2962,6 @@
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
-        metrics.decRunningMaps(jobId, 1);
         metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
@@ -2976,7 +2971,6 @@
         }
       } else {
         runningReduceTasks -= 1;
-        metrics.decRunningReduces(jobId, 1);
         metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=830531&r1=830530&r2=830531&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Oct 28 13:32:26 2009
@@ -2439,6 +2439,8 @@
       totalReduces -= oldStatus.countReduceTasks();
       occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
       occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+      getInstrumentation().decRunningMaps(oldStatus.countMapTasks());
+      getInstrumentation().decRunningReduces(oldStatus.countReduceTasks());
       getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
       getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -2465,6 +2467,8 @@
       totalReduces += status.countReduceTasks();
       occupiedMapSlots += status.countOccupiedMapSlots();
       occupiedReduceSlots += status.countOccupiedReduceSlots();
+      getInstrumentation().addRunningMaps(status.countMapTasks());
+      getInstrumentation().addRunningReduces(status.countReduceTasks());
       getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
       getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=830531&r1=830530&r2=830531&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Wed Oct 28 13:32:26 2009
@@ -127,16 +127,16 @@
   public void decRunningJob(JobConf conf, JobID id) 
   { }
 
-  public void addRunningMaps(JobID id, int task)
+  public void addRunningMaps(int tasks)
   { }
 
-  public void decRunningMaps(JobID id, int task) 
+  public void decRunningMaps(int tasks) 
   { }
 
-  public void addRunningReduces(JobID id, int task)
+  public void addRunningReduces(int tasks)
   { }
 
-  public void decRunningReduces(JobID id, int task)
+  public void decRunningReduces(int tasks)
   { }
 
   public void killedMap(TaskAttemptID taskAttemptID)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=830531&r1=830530&r2=830531&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Wed Oct 28 13:32:26 2009
@@ -341,25 +341,25 @@
   }
 
   @Override
-  public synchronized void addRunningMaps(JobID id, int task)
+  public synchronized void addRunningMaps(int task)
   {
     numRunningMaps += task;
   }
 
   @Override
-  public synchronized void decRunningMaps(JobID id, int task) 
+  public synchronized void decRunningMaps(int task) 
   {
     numRunningMaps -= task;
   }
 
   @Override
-  public synchronized void addRunningReduces(JobID id, int task)
+  public synchronized void addRunningReduces(int task)
   {
     numRunningReduces += task;
   }
 
   @Override
-  public synchronized void decRunningReduces(JobID id, int task)
+  public synchronized void decRunningReduces(int task)
   {
     numRunningReduces -= task;
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=830531&r1=830530&r2=830531&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Wed Oct 28 13:32:26 2009
@@ -129,11 +129,6 @@
     taskAttemptID[1] = job.findMapTask(trackers[1]);
     taskAttemptID[2] = job.findReduceTask(trackers[2]);
     
-    assertTrue("Mismatch in num  running maps",
-        mi.numRunningMaps == numMaps);
-    assertTrue("Mismatch in num running reduces",
-        mi.numRunningReduces == numReds);
-
     job.finishTask(taskAttemptID[0]);
     job.finishTask(taskAttemptID[1]);
     job.finishTask(taskAttemptID[2]);
@@ -254,6 +249,10 @@
       mapSlotsPerTask+mapSlotsPerTask1, mi.numOccupiedMapSlots);
     assertEquals("Mismatch in reduce slots occupied",
       reduceSlotsPerTask+reduceSlotsPerTask1, mi.numOccupiedReduceSlots);
+    assertEquals("Mismatch in num  running maps",
+        2, mi.numRunningMaps);
+      assertEquals("Mismatch in num running reduces",
+        2, mi.numRunningReduces);
     
     //now send heartbeat with no running tasks
     status = new TaskTrackerStatus[1];
@@ -265,6 +264,10 @@
       0, mi.numOccupiedMapSlots);
     assertEquals("Mismatch in reduce slots occupied",
       0, mi.numOccupiedReduceSlots);
+    assertEquals("Mismatch in num  running maps",
+      0, mi.numRunningMaps);
+    assertEquals("Mismatch in num running reduces",
+      0, mi.numRunningReduces);
   }
 
   public void testReservedSlots() throws IOException {
@@ -555,25 +558,25 @@
     }
 
     @Override
-    public synchronized void addRunningMaps(JobID id, int task)
+    public synchronized void addRunningMaps(int task)
     {
       numRunningMaps += task;
     }
 
     @Override
-    public synchronized void decRunningMaps(JobID id, int task) 
+    public synchronized void decRunningMaps(int task) 
     {
       numRunningMaps -= task;
     }
 
     @Override
-    public synchronized void addRunningReduces(JobID id, int task)
+    public synchronized void addRunningReduces(int task)
     {
       numRunningReduces += task;
     }
 
     @Override
-    public synchronized void decRunningReduces(JobID id, int task)
+    public synchronized void decRunningReduces(int task)
     {
       numRunningReduces -= task;
     }