You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/07/16 01:30:53 UTC

svn commit: r964640 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: dhruba
Date: Thu Jul 15 23:30:53 2010
New Revision: 964640

URL: http://svn.apache.org/viewvc?rev=964640&view=rev
Log:
MAPREDUCE-1848. Put number of speculative, data local, rack local 
tasks in JobTracker metrics. (Scott Chen via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 15 23:30:53 2010
@@ -80,6 +80,9 @@ Trunk (unreleased changes)
     Also includes compatibility with security enhancements, and scalability
     improvements. (Amar Kamat, Rahul Singh, Hong Tang, and cdouglas)
 
+    MAPREDUCE-1848. Put number of speculative, data local, rack local 
+    tasks in JobTracker metrics. (Scott Chen via dhruba)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jul 15 23:30:53 2010
@@ -1640,6 +1640,7 @@ public class JobInProgress {
       splits = tip.getSplitNodes();
       if (tip.isSpeculating()) {
         speculativeMapTasks++;
+        metrics.speculateMap(id);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Chosen speculative task, current speculativeMap task count: "
                     + speculativeMapTasks);
@@ -1652,6 +1653,7 @@ public class JobInProgress {
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
       if (tip.isSpeculating()) {
         speculativeReduceTasks++;
+        metrics.speculateReduce(id);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Chosen speculative task, current speculativeReduce task count: "
                     + speculativeReduceTasks);
@@ -1691,10 +1693,12 @@ public class JobInProgress {
       case 0 :
         LOG.info("Choosing data-local task " + tip.getTIPId());
         jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
+        metrics.launchDataLocalMap(id);
         break;
       case 1:
         LOG.info("Choosing rack-local task " + tip.getTIPId());
         jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
+        metrics.launchRackLocalMap(id);
         break;
       default :
         // check if there is any locality

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Thu Jul 15 23:30:53 2010
@@ -162,4 +162,16 @@ class JobTrackerInstrumentation {
 
   public void heartbeat() {
   }
+
+  public void speculateMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void speculateReduce(TaskAttemptID taskAttemptID)
+  { }
+
+  public void launchDataLocalMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void launchRackLocalMap(TaskAttemptID taskAttemptID)
+  { }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Thu Jul 15 23:30:53 2010
@@ -36,6 +36,10 @@ class JobTrackerMetricsInst extends JobT
   private int numJobsCompleted = 0;
   private int numWaitingMaps = 0;
   private int numWaitingReduces = 0;
+  private int numSpeculativeMaps = 0;
+  private int numSpeculativeReduces = 0;
+  private int numDataLocalMaps = 0;
+  private int numRackLocalMaps = 0;
 
   //Cluster status fields.
   private volatile int numMapSlots = 0;
@@ -101,6 +105,10 @@ class JobTrackerMetricsInst extends JobT
       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
       metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
       metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+      metricsRecord.incrMetric("speculative_maps", numSpeculativeMaps);
+      metricsRecord.incrMetric("speculative_reduces", numSpeculativeReduces);
+      metricsRecord.incrMetric("datalocal_maps", numDataLocalMaps);
+      metricsRecord.incrMetric("racklocal_maps", numRackLocalMaps);
       
       metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
       metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
@@ -138,6 +146,10 @@ class JobTrackerMetricsInst extends JobT
       numWaitingReduces = 0;
       numBlackListedMapSlots = 0;
       numBlackListedReduceSlots = 0;
+      numSpeculativeMaps = 0;
+      numSpeculativeReduces = 0;
+      numDataLocalMaps = 0;
+      numRackLocalMaps = 0;
       
       numReservedMapSlots = 0;
       numReservedReduceSlots = 0;
@@ -171,6 +183,16 @@ class JobTrackerMetricsInst extends JobT
   }
 
   @Override
+  public synchronized void launchDataLocalMap(TaskAttemptID taskAttemptID) {
+    ++numDataLocalMaps;
+  }
+
+  @Override
+  public synchronized void launchRackLocalMap(TaskAttemptID taskAttemptID) {
+    ++numRackLocalMaps;
+  }
+
+  @Override
   public synchronized void completeMap(TaskAttemptID taskAttemptID) {
     ++numMapTasksCompleted;
   }
@@ -182,6 +204,11 @@ class JobTrackerMetricsInst extends JobT
   }
 
   @Override
+  public synchronized void speculateMap(TaskAttemptID taskAttemptID) {
+    ++numSpeculativeMaps;
+  }
+
+  @Override
   public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
     ++numReduceTasksLaunched;
     decWaitingReduces(taskAttemptID.getJobID(), 1);
@@ -199,6 +226,11 @@ class JobTrackerMetricsInst extends JobT
   }
 
   @Override
+  public synchronized void speculateReduce(TaskAttemptID taskAttemptID) {
+    ++numSpeculativeReduces;
+  }
+
+  @Override
   public synchronized void submitJob(JobConf conf, JobID id) {
     ++numJobsSubmitted;
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Thu Jul 15 23:30:53 2010
@@ -298,4 +298,324 @@ public class FakeObjectUtilities {
     @Override
     public void closeWriter(org.apache.hadoop.mapreduce.JobID id) { }
   }
+
+  static class FakeJobTrackerMetricsInst extends JobTrackerInstrumentation  {
+    public FakeJobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
+      super(tracker, conf);
+    }
+
+    int numMapTasksLaunched = 0;
+    int numMapTasksCompleted = 0;
+    int numMapTasksFailed = 0;
+    int numReduceTasksLaunched = 0;
+    int numReduceTasksCompleted = 0;
+    int numReduceTasksFailed = 0;
+    int numJobsSubmitted = 0;
+    int numJobsCompleted = 0;
+    int numWaitingMaps = 0;
+    int numWaitingReduces = 0;
+    int numSpeculativeMaps = 0;
+    int numSpeculativeReduces = 0;
+    int numDataLocalMaps = 0;
+    int numRackLocalMaps = 0;
+
+    //Cluster status fields.
+    volatile int numMapSlots = 0;
+    volatile int numReduceSlots = 0;
+    int numBlackListedMapSlots = 0;
+    int numBlackListedReduceSlots = 0;
+
+    int numReservedMapSlots = 0;
+    int numReservedReduceSlots = 0;
+    int numOccupiedMapSlots = 0;
+    int numOccupiedReduceSlots = 0;
+
+    int numJobsFailed = 0;
+    int numJobsKilled = 0;
+
+    int numJobsPreparing = 0;
+    int numJobsRunning = 0;
+
+    int numRunningMaps = 0;
+    int numRunningReduces = 0;
+
+    int numMapTasksKilled = 0;
+    int numReduceTasksKilled = 0;
+
+    int numTrackers = 0;
+    int numTrackersBlackListed = 0;
+
+    int numTrackersDecommissioned = 0;
+
+    long numHeartbeats = 0;
+
+    @Override
+    public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+      ++numMapTasksLaunched;
+      decWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+      ++numMapTasksCompleted;
+    }
+
+    @Override
+    public synchronized void failedMap(TaskAttemptID taskAttemptID) {
+      ++numMapTasksFailed;
+      addWaitingMaps(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+      ++numReduceTasksLaunched;
+      decWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+      ++numReduceTasksCompleted;
+    }
+
+    @Override
+    public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
+      ++numReduceTasksFailed;
+      addWaitingReduces(taskAttemptID.getJobID(), 1);
+    }
+
+    @Override
+    public synchronized void submitJob(JobConf conf, JobID id) {
+      ++numJobsSubmitted;
+    }
+
+    @Override
+    public synchronized void completeJob(JobConf conf, JobID id) {
+      ++numJobsCompleted;
+    }
+
+    @Override
+    public synchronized void addWaitingMaps(JobID id, int task) {
+      numWaitingMaps  += task;
+    }
+
+    @Override
+    public synchronized void decWaitingMaps(JobID id, int task) {
+      numWaitingMaps -= task;
+    }
+
+    @Override
+    public synchronized void addWaitingReduces(JobID id, int task) {
+      numWaitingReduces += task;
+    }
+
+    @Override
+    public synchronized void decWaitingReduces(JobID id, int task){
+      numWaitingReduces -= task;
+    }
+
+    @Override
+    public void setMapSlots(int slots) {
+      numMapSlots = slots;
+    }
+
+    @Override
+    public void setReduceSlots(int slots) {
+      numReduceSlots = slots;
+    }
+
+    @Override
+    public synchronized void addBlackListedMapSlots(int slots){
+      numBlackListedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decBlackListedMapSlots(int slots){
+      numBlackListedMapSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addBlackListedReduceSlots(int slots){
+      numBlackListedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decBlackListedReduceSlots(int slots){
+      numBlackListedReduceSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addReservedMapSlots(int slots)
+    {
+      numReservedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decReservedMapSlots(int slots)
+    {
+      numReservedMapSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addReservedReduceSlots(int slots)
+    {
+      numReservedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decReservedReduceSlots(int slots)
+    {
+      numReservedReduceSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addOccupiedMapSlots(int slots)
+    {
+      numOccupiedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decOccupiedMapSlots(int slots)
+    {
+      numOccupiedMapSlots -= slots;
+    }
+
+    @Override
+    public synchronized void addOccupiedReduceSlots(int slots)
+    {
+      numOccupiedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decOccupiedReduceSlots(int slots)
+    {
+      numOccupiedReduceSlots -= slots;
+    }
+
+    @Override
+    public synchronized void failedJob(JobConf conf, JobID id)
+    {
+      numJobsFailed++;
+    }
+
+    @Override
+    public synchronized void killedJob(JobConf conf, JobID id)
+    {
+      numJobsKilled++;
+    }
+
+    @Override
+    public synchronized void addPrepJob(JobConf conf, JobID id)
+    {
+      numJobsPreparing++;
+    }
+
+    @Override
+    public synchronized void decPrepJob(JobConf conf, JobID id)
+    {
+      numJobsPreparing--;
+    }
+
+    @Override
+    public synchronized void addRunningJob(JobConf conf, JobID id)
+    {
+      numJobsRunning++;
+    }
+
+    @Override
+    public synchronized void decRunningJob(JobConf conf, JobID id)
+    {
+      numJobsRunning--;
+    }
+
+    @Override
+    public synchronized void addRunningMaps(int task)
+    {
+      numRunningMaps += task;
+    }
+
+    @Override
+    public synchronized void decRunningMaps(int task)
+    {
+      numRunningMaps -= task;
+    }
+
+    @Override
+    public synchronized void addRunningReduces(int task)
+    {
+      numRunningReduces += task;
+    }
+
+    @Override
+    public synchronized void decRunningReduces(int task)
+    {
+      numRunningReduces -= task;
+    }
+
+    @Override
+    public synchronized void killedMap(TaskAttemptID taskAttemptID)
+    {
+      numMapTasksKilled++;
+    }
+
+    @Override
+    public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+    {
+      numReduceTasksKilled++;
+    }
+
+    @Override
+    public synchronized void addTrackers(int trackers)
+    {
+      numTrackers += trackers;
+    }
+
+    @Override
+    public synchronized void decTrackers(int trackers)
+    {
+      numTrackers -= trackers;
+    }
+
+    @Override
+    public synchronized void addBlackListedTrackers(int trackers)
+    {
+      numTrackersBlackListed += trackers;
+    }
+
+    @Override
+    public synchronized void decBlackListedTrackers(int trackers)
+    {
+      numTrackersBlackListed -= trackers;
+    }
+
+    @Override
+    public synchronized void setDecommissionedTrackers(int trackers)
+    {
+      numTrackersDecommissioned = trackers;
+    }
+
+    @Override
+    public synchronized void heartbeat() {
+      ++numHeartbeats;
+    }
+
+    @Override
+    public synchronized void speculateReduce(TaskAttemptID taskAttemptID) {
+      ++numSpeculativeReduces;
+    }
+
+    @Override
+    public synchronized void speculateMap(TaskAttemptID taskAttemptID) {
+      ++numSpeculativeMaps;
+    }
+
+    @Override
+    public synchronized void launchDataLocalMap(TaskAttemptID taskAttemptID) {
+      ++numDataLocalMaps;
+    }
+
+    @Override
+    public synchronized void launchRackLocalMap(TaskAttemptID taskAttemptID) {
+      ++numRackLocalMaps;
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Thu Jul 15 23:30:53 2010
@@ -28,6 +28,7 @@ import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
 
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTrackerMetricsInst;
 import org.apache.hadoop.mapred.TestTaskTrackerBlacklisting.FakeJobTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -55,7 +56,7 @@ public class TestJobTrackerInstrumentati
   private static int numMapSlotsToReserve = 2;
   private static int numReduceSlotsToReserve = 2;
 
-  private static MyJobTrackerMetricsInst mi;
+  private static FakeJobTrackerMetricsInst mi;
   
   
 
@@ -71,9 +72,9 @@ public class TestJobTrackerInstrumentati
             FakeTaskScheduler.class, TaskScheduler.class);
 
         conf.set(JTConfig.JT_INSTRUMENTATION, 
-            MyJobTrackerMetricsInst.class.getName());
+            FakeJobTrackerMetricsInst.class.getName());
         jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
-        mi = (MyJobTrackerMetricsInst) jobTracker.getInstrumentation();
+        mi = (FakeJobTrackerMetricsInst) jobTracker.getInstrumentation();
         for (String tracker : trackers) {
           FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
         }
@@ -390,300 +391,4 @@ public class TestJobTrackerInstrumentati
           numReduceTasks);
     }
   }
-
-  static class MyJobTrackerMetricsInst extends JobTrackerInstrumentation  {
-    public MyJobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
-      super(tracker, conf);
-    }
-
-    private int numMapTasksLaunched = 0;
-    private int numMapTasksCompleted = 0;
-    private int numMapTasksFailed = 0;
-    private int numReduceTasksLaunched = 0;
-    private int numReduceTasksCompleted = 0;
-    private int numReduceTasksFailed = 0;
-    private int numJobsSubmitted = 0;
-    private int numJobsCompleted = 0;
-    private int numWaitingMaps = 0;
-    private int numWaitingReduces = 0;
-
-    //Cluster status fields.
-    private volatile int numMapSlots = 0;
-    private volatile int numReduceSlots = 0;
-    private int numBlackListedMapSlots = 0;
-    private int numBlackListedReduceSlots = 0;
-
-    private int numReservedMapSlots = 0;
-    private int numReservedReduceSlots = 0;
-    private int numOccupiedMapSlots = 0;
-    private int numOccupiedReduceSlots = 0;
-    
-    private int numJobsFailed = 0;
-    private int numJobsKilled = 0;
-    
-    private int numJobsPreparing = 0;
-    private int numJobsRunning = 0;
-    
-    private int numRunningMaps = 0;
-    private int numRunningReduces = 0;
-    
-    private int numMapTasksKilled = 0;
-    private int numReduceTasksKilled = 0;
-
-    private int numTrackers = 0;
-    private int numTrackersBlackListed = 0;
-
-    private int numTrackersDecommissioned = 0;
-
-    private long numHeartbeats = 0;
-
-    @Override
-    public synchronized void launchMap(TaskAttemptID taskAttemptID) {
-      ++numMapTasksLaunched;
-      decWaitingMaps(taskAttemptID.getJobID(), 1);
-    }
-
-    @Override
-    public synchronized void completeMap(TaskAttemptID taskAttemptID) {
-      ++numMapTasksCompleted;
-    }
-
-    @Override
-    public synchronized void failedMap(TaskAttemptID taskAttemptID) {
-      ++numMapTasksFailed;
-      addWaitingMaps(taskAttemptID.getJobID(), 1);
-    }
-
-    @Override
-    public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
-      ++numReduceTasksLaunched;
-      decWaitingReduces(taskAttemptID.getJobID(), 1);
-    }
-
-    @Override
-    public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
-      ++numReduceTasksCompleted;
-    }
-
-    @Override
-    public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
-      ++numReduceTasksFailed;
-      addWaitingReduces(taskAttemptID.getJobID(), 1);
-    }
-
-    @Override
-    public synchronized void submitJob(JobConf conf, JobID id) {
-      ++numJobsSubmitted;
-    }
-
-    @Override
-    public synchronized void completeJob(JobConf conf, JobID id) {
-      ++numJobsCompleted;
-    }
-
-    @Override
-    public synchronized void addWaitingMaps(JobID id, int task) {
-      numWaitingMaps  += task;
-    }
-
-    @Override
-    public synchronized void decWaitingMaps(JobID id, int task) {
-      numWaitingMaps -= task;
-    }
-
-    @Override
-    public synchronized void addWaitingReduces(JobID id, int task) {
-      numWaitingReduces += task;
-    }
-
-    @Override
-    public synchronized void decWaitingReduces(JobID id, int task){
-      numWaitingReduces -= task;
-    }
-
-    @Override
-    public void setMapSlots(int slots) {
-      numMapSlots = slots;
-    }
-
-    @Override
-    public void setReduceSlots(int slots) {
-      numReduceSlots = slots;
-    }
-
-    @Override
-    public synchronized void addBlackListedMapSlots(int slots){
-      numBlackListedMapSlots += slots;
-    }
-
-    @Override
-    public synchronized void decBlackListedMapSlots(int slots){
-      numBlackListedMapSlots -= slots;
-    }
-
-    @Override
-    public synchronized void addBlackListedReduceSlots(int slots){
-      numBlackListedReduceSlots += slots;
-    }
-
-    @Override
-    public synchronized void decBlackListedReduceSlots(int slots){
-      numBlackListedReduceSlots -= slots;
-    }
-    
-    @Override
-    public synchronized void addReservedMapSlots(int slots)
-    { 
-      numReservedMapSlots += slots;
-    }
-
-    @Override
-    public synchronized void decReservedMapSlots(int slots)
-    {
-      numReservedMapSlots -= slots;
-    }
-
-    @Override
-    public synchronized void addReservedReduceSlots(int slots)
-    {
-      numReservedReduceSlots += slots;
-    }
-
-    @Override
-    public synchronized void decReservedReduceSlots(int slots)
-    {
-      numReservedReduceSlots -= slots;
-    }
-
-    @Override
-    public synchronized void addOccupiedMapSlots(int slots)
-    {
-      numOccupiedMapSlots += slots;
-    }
-
-    @Override
-    public synchronized void decOccupiedMapSlots(int slots)
-    {
-      numOccupiedMapSlots -= slots;
-    }
-
-    @Override
-    public synchronized void addOccupiedReduceSlots(int slots)
-    {
-      numOccupiedReduceSlots += slots;
-    }
-
-    @Override
-    public synchronized void decOccupiedReduceSlots(int slots)
-    {
-      numOccupiedReduceSlots -= slots;
-    }
-
-    @Override
-    public synchronized void failedJob(JobConf conf, JobID id) 
-    {
-      numJobsFailed++;
-    }
-
-    @Override
-    public synchronized void killedJob(JobConf conf, JobID id) 
-    {
-      numJobsKilled++;
-    }
-
-    @Override
-    public synchronized void addPrepJob(JobConf conf, JobID id) 
-    {
-      numJobsPreparing++;
-    }
-
-    @Override
-    public synchronized void decPrepJob(JobConf conf, JobID id) 
-    {
-      numJobsPreparing--;
-    }
-
-    @Override
-    public synchronized void addRunningJob(JobConf conf, JobID id) 
-    {
-      numJobsRunning++;
-    }
-
-    @Override
-    public synchronized void decRunningJob(JobConf conf, JobID id) 
-    {
-      numJobsRunning--;
-    }
-
-    @Override
-    public synchronized void addRunningMaps(int task)
-    {
-      numRunningMaps += task;
-    }
-
-    @Override
-    public synchronized void decRunningMaps(int task) 
-    {
-      numRunningMaps -= task;
-    }
-
-    @Override
-    public synchronized void addRunningReduces(int task)
-    {
-      numRunningReduces += task;
-    }
-
-    @Override
-    public synchronized void decRunningReduces(int task)
-    {
-      numRunningReduces -= task;
-    }
-
-    @Override
-    public synchronized void killedMap(TaskAttemptID taskAttemptID)
-    {
-      numMapTasksKilled++;
-    }
-
-    @Override
-    public synchronized void killedReduce(TaskAttemptID taskAttemptID)
-    {
-      numReduceTasksKilled++;
-    }
-
-    @Override
-    public synchronized void addTrackers(int trackers)
-    {
-      numTrackers += trackers;
-    }
-
-    @Override
-    public synchronized void decTrackers(int trackers)
-    {
-      numTrackers -= trackers;
-    }
-
-    @Override
-    public synchronized void addBlackListedTrackers(int trackers)
-    {
-      numTrackersBlackListed += trackers;
-    }
-
-    @Override
-    public synchronized void decBlackListedTrackers(int trackers)
-    {
-      numTrackersBlackListed -= trackers;
-    }
-
-    @Override
-    public synchronized void setDecommissionedTrackers(int trackers)
-    {
-      numTrackersDecommissioned = trackers;
-    }
-
-    @Override
-    public synchronized void heartbeat() {
-      ++numHeartbeats;
-    }
-  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Thu Jul 15 23:30:53 2010
@@ -28,6 +28,7 @@ import junit.framework.TestSuite;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTrackerMetricsInst;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobCounter;
@@ -37,6 +38,7 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.StaticMapping;
+import org.mortbay.log.Log;
 
 /**
  * A JUnit test to test configured task limits.
@@ -57,6 +59,7 @@ public class TestRackAwareTaskPlacement 
   static FakeJobTracker jobTracker;
   static String jtIdentifier = "test";
   private static int jobCounter;
+  private static FakeJobTrackerMetricsInst fakeInst;
   
   public static Test suite() {
     TestSetup setup = 
@@ -67,7 +70,10 @@ public class TestRackAwareTaskPlacement 
         conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
         conf.setClass("topology.node.switch.mapping.impl", 
           StaticMapping.class, DNSToSwitchMapping.class);
+        conf.set(JTConfig.JT_INSTRUMENTATION,
+            FakeJobTrackerMetricsInst.class.getName());
         jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
+        fakeInst = (FakeJobTrackerMetricsInst) jobTracker.getInstrumentation();
         // Set up the Topology Information
         for (int i = 0; i < allHosts.length; i++) {
           StaticMapping.addNodeToRack(allHosts[i], allRacks[i]);
@@ -169,6 +175,9 @@ public class TestRackAwareTaskPlacement 
     
     assertEquals("Number of Other-local maps", 0, 
         counters.getCounter(JobCounter.OTHER_LOCAL_MAPS));
+    // Also verify jobtracker instrumentation
+    assertEquals("Number of data local maps", 3, fakeInst.numDataLocalMaps);
+    assertEquals("Number of rack local maps", 1, fakeInst.numRackLocalMaps);
 
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=964640&r1=964639&r2=964640&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Thu Jul 15 23:30:53 2010
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTrackerMetricsInst;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
@@ -43,6 +44,7 @@ public class TestSpeculativeExecution ex
   };
   static SpecFakeClock clock;
   static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+  private static FakeJobTrackerMetricsInst fakeInst;
 
   
   static String trackers[] = new String[] {"tracker_tracker1:1000", 
@@ -56,8 +58,11 @@ public class TestSpeculativeExecution ex
         JobConf conf = new JobConf();
         conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
         conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+        conf.set(JTConfig.JT_INSTRUMENTATION,
+            FakeJobTrackerMetricsInst.class.getName());
         jobTracker = new FakeJobTracker(conf, (clock = new SpecFakeClock()),
             trackers);
+        fakeInst = (FakeJobTrackerMetricsInst) jobTracker.getInstrumentation();
         for (String tracker : trackers) {
           FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
         }
@@ -127,6 +132,12 @@ public class TestSpeculativeExecution ex
       "Running reduces count should be updated from " + oldRunningReduces +
         " to " + (oldRunningReduces - 1), job.runningReduces(),
       oldRunningReduces - 1);
+    // Verify total speculative tasks by jobtracker instrumentation
+    assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
+    assertEquals("Total speculative reduces", 1,
+                 fakeInst.numSpeculativeReduces);
+    LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+    LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
     
     job.finishTask(taskAttemptID[7]);
   }
@@ -168,6 +179,12 @@ public class TestSpeculativeExecution ex
     job.finishTask(taskAttemptID[8]);
     assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
         job.isSlowTracker(trackers[2]), true);
+    // Verify total speculative tasks by jobtracker instrumentation
+    assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
+    assertEquals("Total speculative reduces", 1,
+                 fakeInst.numSpeculativeReduces);
+    LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+    LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
   }
   
   public void testTaskToSpeculate() throws IOException {
@@ -200,6 +217,12 @@ public class TestSpeculativeExecution ex
     taskAttemptID[5] = job.findReduceTask(trackers[4]);
     assertEquals(taskAttemptID[5].getTaskID().getId(),3);
     
+    // Verify total speculative tasks by jobtracker instrumentation
+    assertEquals("Total speculative maps", 1, fakeInst.numSpeculativeMaps);
+    assertEquals("Total speculative reduces", 3,
+                 fakeInst.numSpeculativeReduces);
+    LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+    LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
   }
   
   /*
@@ -236,6 +259,12 @@ public class TestSpeculativeExecution ex
     job.progressMade(taskAttemptID[4], 0.20f);
     taskAttemptID[5] = job.findMapTask(trackers[4]);
     assertEquals(taskAttemptID[5].getTaskID().getId(),4);
+    // Verify total speculative tasks by jobtracker instrumentation
+    assertEquals("Total speculative maps", 2, fakeInst.numSpeculativeMaps);
+    assertEquals("Total speculative reduces", 3,
+                 fakeInst.numSpeculativeReduces);
+    LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+    LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
   }
 
   /*
@@ -254,6 +283,12 @@ public class TestSpeculativeExecution ex
     assertEquals(speculativeCap(1200,1150,20), 10);
     //Tests the fact that the max tasks launched is 0.01 * #slots
     assertEquals(speculativeCap(1200,1150,4000), 20);
+    // Verify total speculative tasks by jobtracker instrumentation
+    assertEquals("Total speculative maps", 72, fakeInst.numSpeculativeMaps);
+    assertEquals("Total speculative reduces", 3,
+                 fakeInst.numSpeculativeReduces);
+    LOG.info("Total speculative maps = " + fakeInst.numSpeculativeMaps);
+    LOG.info("Total speculative reduces = " + fakeInst.numSpeculativeReduces);
   }
   
   private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)