You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:34:26 UTC

svn commit: r1077034 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/ contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/m...

Author: omalley
Date: Fri Mar  4 03:34:25 2011
New Revision: 1077034

URL: http://svn.apache.org/viewvc?rev=1077034&view=rev
Log:
commit 3b6c557e3e1f3d03df503114b0123263b1a56e9e
Author: Hemanth Yamijala <yh...@apache.org>
Date:   Sat Oct 24 17:42:59 2009 +0530

    MAPREDUCE:1103 from https://issues.apache.org/jira/secure/attachment/12423030/1103_v5_yahoo_1.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1103. Added more metrics to Jobtracker. (sharad)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 03:34:25 2011
@@ -166,8 +166,9 @@ public class TestCapacityScheduler exten
     private int speculativeMapTaskCounter = 0;
     private int speculativeReduceTaskCounter = 0;
     public FakeJobInProgress(JobID jId, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(jId, jobConf);
+        FakeTaskTrackerManager taskTrackerManager, String user, 
+        JobTracker jt) {
+      super(jId, jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP);
@@ -310,8 +311,9 @@ public class TestCapacityScheduler exten
   static class FakeFailingJobInProgress extends FakeJobInProgress {
 
     public FakeFailingJobInProgress(JobID id, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(id, jobConf, taskTrackerManager, user);
+        FakeTaskTrackerManager taskTrackerManager, String user, 
+        JobTracker jt) {
+      super(id, jobConf, taskTrackerManager, user, jt);
     }
     
     @Override
@@ -765,7 +767,7 @@ public class TestCapacityScheduler exten
     FakeJobInProgress job =
         new FakeJobInProgress(new JobID("test", ++jobCounter),
             (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
-            jobConf.getUser());
+            jobConf.getUser(), UtilsForTests.getJobTracker());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     return job;
@@ -2588,7 +2590,8 @@ public class TestCapacityScheduler exten
     //Submit a job whose initialization would fail always.
     FakeJobInProgress job =
       new FakeFailingJobInProgress(new JobID("test", ++jobCounter),
-          new JobConf(), taskTrackerManager,"u1");
+          new JobConf(), taskTrackerManager,"u1", 
+          UtilsForTests.getJobTracker());
     job.getStatus().setRunState(JobStatus.PREP);
     taskTrackerManager.submitJob(job);
     //check if job is present in waiting list.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/ivy.xml Fri Mar  4 03:34:25 2011
@@ -38,5 +38,21 @@
       name="junit"
       rev="${junit.version}"
       conf="common->default"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-api-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar  4 03:34:25 2011
@@ -51,8 +51,9 @@ public class TestFairScheduler extends T
     private FakeTaskTrackerManager taskTrackerManager;
     
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+        FakeTaskTrackerManager taskTrackerManager, 
+        JobTracker jt) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus();
@@ -292,7 +293,8 @@ public class TestFairScheduler extends T
     jobConf.setNumReduceTasks(reduces);
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
-    JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+    JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
+        UtilsForTests.getJobTracker());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:34:25 2011
@@ -258,14 +258,14 @@ class JobInProgress {
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
    */
-  protected JobInProgress(JobID jobid, JobConf conf) {
+  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
     this.conf = conf;
     this.jobId = jobid;
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
     this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
     this.anyCacheLevel = this.maxLevel+1;
-    this.jobtracker = null;
+    this.jobtracker = tracker;
     this.restartCount = 0;
   }
   
@@ -292,6 +292,7 @@ class JobInProgress {
         + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
     this.jobtracker = jobtracker;
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
     this.startTime = System.currentTimeMillis();
     status.setStartTime(startTime);
     this.localFs = FileSystem.getLocal(default_conf);
@@ -1413,6 +1414,7 @@ class JobInProgress {
       name = Values.CLEANUP.name();
     } else if (tip.isMapTask()) {
       ++runningMapTasks;
+      metrics.addRunningMaps(jobId, 1);
       name = Values.MAP.name();
       counter = Counter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
@@ -1421,6 +1423,7 @@ class JobInProgress {
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
+      metrics.addRunningReduces(jobId, 1);
       name = Values.REDUCE.name();
       counter = Counter.TOTAL_LAUNCHED_REDUCES;
       if (tip.getActiveTasks().size() > 1)
@@ -1541,8 +1544,10 @@ class JobInProgress {
     long now = System.currentTimeMillis();
     
     FallowSlotInfo info = map.get(taskTracker);
+    int reservedSlots = 0;
     if (info == null) {
       info = new FallowSlotInfo(now, numSlots);
+      reservedSlots = numSlots;
     } else {
       // Increment metering info if the reservation is changing
       if (info.getNumSlots() != numSlots) {
@@ -1554,11 +1559,18 @@ class JobInProgress {
         jobCounters.incrCounter(counter, fallowSlotMillis);
         
         // Update 
+        reservedSlots = numSlots - info.getNumSlots();
         info.setTimestamp(now);
         info.setNumSlots(numSlots);
       }
     }
     map.put(taskTracker, info);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+    }
+    else {
+      jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+    }
   }
   
   public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1584,6 +1596,13 @@ class JobInProgress {
     jobCounters.incrCounter(counter, fallowSlotMillis);
 
     map.remove(taskTracker);
+    if (type == TaskType.MAP) {
+      jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+    }
+    else {
+      jobtracker.getInstrumentation().decReservedReduceSlots(
+        info.getNumSlots());
+    }
   }
   
   public int getNumReservedTaskTrackersForMaps() {
@@ -2265,7 +2284,7 @@ class JobInProgress {
       this.status.setSetupProgress(1.0f);
       // move the job to running state if the job is in prep state
       if (this.status.getRunState() == JobStatus.PREP) {
-        this.status.setRunState(JobStatus.RUNNING);
+        changeStateTo(JobStatus.RUNNING);
         JobHistory.JobInfo.logStarted(profile.getJobID());
       }
     } else if (tip.isJobCleanupTask()) {
@@ -2294,6 +2313,7 @@ class JobInProgress {
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
+      metrics.decRunningMaps(jobId, 1);
       // check if this was a sepculative task
       if (oldNumAttempts > 1) {
         speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
@@ -2307,6 +2327,7 @@ class JobInProgress {
       }
     } else {
       runningReduceTasks -= 1;
+      metrics.decRunningReduces(jobId, 1);
       if (oldNumAttempts > 1) {
         speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
       }
@@ -2323,6 +2344,31 @@ class JobInProgress {
   }
 
   /**
+   * Job state change must happen thru this call
+   */
+  private void changeStateTo(int newState) {
+    int oldState = this.status.getRunState();
+    if (oldState == newState) {
+      return; //old and new states are same
+    }
+    this.status.setRunState(newState);
+    
+    //update the metrics
+    if (oldState == JobStatus.PREP) {
+      this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+    } else if (oldState == JobStatus.RUNNING) {
+      this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+    }
+    
+    if (newState == JobStatus.PREP) {
+      this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+    } else if (newState == JobStatus.RUNNING) {
+      this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+    }
+    
+  }
+
+  /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
    */
@@ -2332,7 +2378,7 @@ class JobInProgress {
     // All tasks are complete, then the job is done!
     //
     if (this.status.getRunState() == JobStatus.RUNNING ) {
-      this.status.setRunState(JobStatus.SUCCEEDED);
+      changeStateTo(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
       if (maps.length == 0) {
         this.status.setMapProgress(1.0f);
@@ -2371,7 +2417,7 @@ class JobInProgress {
       this.status.setCleanupProgress(1.0f);
       
       if (jobTerminationState == JobStatus.FAILED) {
-        this.status.setRunState(JobStatus.FAILED);
+        changeStateTo(JobStatus.FAILED);
 
         // Log the job summary
         JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
@@ -2381,7 +2427,7 @@ class JobInProgress {
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
       } else {
-        this.status.setRunState(JobStatus.KILLED);
+        changeStateTo(JobStatus.KILLED);
 
         // Log the job summary
         JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
@@ -2395,6 +2441,13 @@ class JobInProgress {
       
       jobtracker.getInstrumentation().terminateJob(
           this.conf, this.status.getJobID());
+      if (jobTerminationState == JobStatus.FAILED) {
+        jobtracker.getInstrumentation().failedJob(
+            this.conf, this.status.getJobID());
+      } else {
+        jobtracker.getInstrumentation().killedJob(
+            this.conf, this.status.getJobID());
+      }
     }
   }
 
@@ -2536,6 +2589,7 @@ class JobInProgress {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
+        metrics.decRunningMaps(jobId, 1);
         metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
@@ -2545,6 +2599,7 @@ class JobInProgress {
         }
       } else {
         runningReduceTasks -= 1;
+        metrics.decRunningReduces(jobId, 1);
         metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:34:25 2011
@@ -405,10 +405,11 @@ public class JobTracker implements MRCon
                       // tracker is lost, and if it is blacklisted, remove 
                       // it from the count of blacklisted trackers in the cluster
                       if (isBlacklisted(trackerName)) {
-                        faultyTrackers.numBlacklistedTrackers -= 1;
+                    	  faultyTrackers.decrBlackListedTrackers(1);
                       }
                       updateTaskTrackerStatus(trackerName, null);
                       statistics.taskTrackerRemoved(trackerName);
+                      getInstrumentation().decTrackers(1);
                       // remove the mapping from the hosts list
                       String hostname = newProfile.getHost();
                       hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -718,6 +719,16 @@ public class JobTracker implements MRCon
       }        
     }
 
+    private void incrBlackListedTrackers(int count) {
+      numBlacklistedTrackers += count;
+      getInstrumentation().addBlackListedTrackers(count);
+    }
+
+    private void decrBlackListedTrackers(int count) {
+      numBlacklistedTrackers -= count;
+      getInstrumentation().decBlackListedTrackers(count);
+    }
+
     private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
       FaultInfo fi = getFaultInfo(hostName, true);
       boolean blackListed = fi.isBlacklisted();
@@ -875,7 +886,7 @@ public class JobTracker implements MRCon
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+        incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
       }
     }
     
@@ -895,7 +906,7 @@ public class JobTracker implements MRCon
         }
         uniqueHostsMap.put(hostName,
                            numTrackersOnHost);
-        numBlacklistedTrackers -= numTrackersOnHost;
+        decrBlackListedTrackers(numTrackersOnHost);
       }
     }
 
@@ -2754,6 +2765,7 @@ public class JobTracker implements MRCon
       hostnameToTaskTracker.put(hostname, trackers);
     }
     statistics.taskTrackerAdded(status.getTrackerName());
+    getInstrumentation().addTrackers(1);
     LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
              + hostname);
     trackers.add(taskTracker);
@@ -3062,6 +3074,8 @@ public class JobTracker implements MRCon
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
+      getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
         int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
@@ -3084,6 +3098,8 @@ public class JobTracker implements MRCon
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
+      getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
         int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
@@ -3192,7 +3208,7 @@ public class JobTracker implements MRCon
           // if this is lost tracker that came back now, and if it blacklisted
           // increment the count of blacklisted trackers in the cluster
           if (isBlacklisted(trackerName)) {
-            faultyTrackers.numBlacklistedTrackers += 1;
+            faultyTrackers.incrBlackListedTrackers(1);
           }
           addNewTracker(taskTracker);
         }
@@ -4234,12 +4250,13 @@ public class JobTracker implements MRCon
   }
 
   // main decommission
-  private synchronized void decommissionNodes(Set<String> hosts) 
+  synchronized void decommissionNodes(Set<String> hosts) 
   throws IOException {  
     LOG.info("Decommissioning " + hosts.size() + " nodes");
     // create a list of tracker hostnames
     synchronized (taskTrackers) {
       synchronized (trackerExpiryQueue) {
+        int trackersDecommissioned = 0;
         for (String host : hosts) {
           LOG.info("Decommissioning host " + host);
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
@@ -4248,11 +4265,14 @@ public class JobTracker implements MRCon
               LOG.info("Decommission: Losing tracker " + tracker + 
                        " on host " + host);
               lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+              updateTaskTrackerStatus(
+                tracker.getStatus().getTrackerName(), null);
             }
+            trackersDecommissioned += trackers.size();
           }
           LOG.info("Host " + host + " is ready for decommissioning");
         }
+        getInstrumentation().setDecommissionedTrackers(trackersDecommissioned);
       }
     }
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Fri Mar  4 03:34:25 2011
@@ -84,4 +84,80 @@ class JobTrackerInstrumentation {
 
   public void decBlackListedReduceSlots(int slots)
   { }
+
+  public void addReservedMapSlots(int slots)
+  { }
+
+  public void decReservedMapSlots(int slots)
+  { }
+
+  public void addReservedReduceSlots(int slots)
+  { }
+
+  public void decReservedReduceSlots(int slots)
+  { }
+
+  public void addOccupiedMapSlots(int slots)
+  { }
+
+  public void decOccupiedMapSlots(int slots)
+  { }
+
+  public void addOccupiedReduceSlots(int slots)
+  { }
+
+  public void decOccupiedReduceSlots(int slots)
+  { }
+
+  public void failedJob(JobConf conf, JobID id) 
+  { }
+
+  public void killedJob(JobConf conf, JobID id) 
+  { }
+
+  public void addPrepJob(JobConf conf, JobID id) 
+  { }
+  
+  public void decPrepJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void decRunningJob(JobConf conf, JobID id) 
+  { }
+
+  public void addRunningMaps(JobID id, int task)
+  { }
+
+  public void decRunningMaps(JobID id, int task) 
+  { }
+
+  public void addRunningReduces(JobID id, int task)
+  { }
+
+  public void decRunningReduces(JobID id, int task)
+  { }
+
+  public void killedMap(TaskAttemptID taskAttemptID)
+  { }
+
+  public void killedReduce(TaskAttemptID taskAttemptID)
+  { }
+
+  public void addTrackers(int trackers)
+  { }
+
+  public void decTrackers(int trackers)
+  { }
+
+  public void addBlackListedTrackers(int trackers)
+  { }
+
+  public void decBlackListedTrackers(int trackers)
+  { }
+
+  public void setDecommissionedTrackers(int trackers)
+  { }  
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Fri Mar  4 03:34:25 2011
@@ -22,8 +22,6 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 
 class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
   private final MetricsRecord metricsRecord;
@@ -45,6 +43,27 @@ class JobTrackerMetricsInst extends JobT
   private int numBlackListedMapSlots = 0;
   private int numBlackListedReduceSlots = 0;
 
+  private int numReservedMapSlots = 0;
+  private int numReservedReduceSlots = 0;
+  private int numOccupiedMapSlots = 0;
+  private int numOccupiedReduceSlots = 0;
+  
+  private int numJobsFailed = 0;
+  private int numJobsKilled = 0;
+  
+  private int numJobsPreparing = 0;
+  private int numJobsRunning = 0;
+  
+  private int numRunningMaps = 0;
+  private int numRunningReduces = 0;
+  
+  private int numMapTasksKilled = 0;
+  private int numReduceTasksKilled = 0;
+
+  private int numTrackers = 0;
+  private int numTrackersBlackListed = 0;
+  private int numTrackersDecommissioned = 0;
+  
   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
     super(tracker, conf);
     String sessionId = conf.getSessionId();
@@ -78,6 +97,28 @@ class JobTrackerMetricsInst extends JobT
       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
       metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
       metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+      
+      metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
+      metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
+      metricsRecord.incrMetric("occupied_map_slots", numOccupiedMapSlots);
+      metricsRecord.incrMetric("occupied_reduce_slots", numOccupiedReduceSlots);
+      
+      metricsRecord.incrMetric("jobs_failed", numJobsFailed);
+      metricsRecord.incrMetric("jobs_killed", numJobsKilled);
+      
+      metricsRecord.incrMetric("jobs_preparing", numJobsPreparing);
+      metricsRecord.incrMetric("jobs_running", numJobsRunning);
+      
+      metricsRecord.incrMetric("running_maps", numRunningMaps);
+      metricsRecord.incrMetric("running_reduces", numRunningReduces);
+      
+      metricsRecord.incrMetric("maps_killed", numMapTasksKilled);
+      metricsRecord.incrMetric("reduces_killed", numReduceTasksKilled);
+
+      metricsRecord.incrMetric("trackers", numTrackers);
+      metricsRecord.incrMetric("trackers_blacklisted", numTrackersBlackListed);
+      metricsRecord.setMetric("trackers_decommissioned", 
+          numTrackersDecommissioned);
 
       numMapTasksLaunched = 0;
       numMapTasksCompleted = 0;
@@ -91,6 +132,26 @@ class JobTrackerMetricsInst extends JobT
       numWaitingReduces = 0;
       numBlackListedMapSlots = 0;
       numBlackListedReduceSlots = 0;
+      
+      numReservedMapSlots = 0;
+      numReservedReduceSlots = 0;
+      numOccupiedMapSlots = 0;
+      numOccupiedReduceSlots = 0;
+      
+      numJobsFailed = 0;
+      numJobsKilled = 0;
+      
+      numJobsPreparing = 0;
+      numJobsRunning = 0;
+      
+      numRunningMaps = 0;
+      numRunningReduces = 0;
+      
+      numMapTasksKilled = 0;
+      numReduceTasksKilled = 0;
+
+      numTrackers = 0;
+      numTrackersBlackListed = 0;
     }
     metricsRecord.update();
 
@@ -166,12 +227,12 @@ class JobTrackerMetricsInst extends JobT
   }
 
   @Override
-  public void setMapSlots(int slots) {
+  public synchronized void setMapSlots(int slots) {
     numMapSlots = slots;
   }
 
   @Override
-  public void setReduceSlots(int slots) {
+  public synchronized void setReduceSlots(int slots) {
     numReduceSlots = slots;
   }
 
@@ -194,4 +255,154 @@ class JobTrackerMetricsInst extends JobT
   public synchronized void decBlackListedReduceSlots(int slots){
     numBlackListedReduceSlots -= slots;
   }
+
+  @Override
+  public synchronized void addReservedMapSlots(int slots)
+  { 
+    numReservedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedMapSlots(int slots)
+  {
+    numReservedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void failedJob(JobConf conf, JobID id) 
+  {
+    numJobsFailed++;
+  }
+
+  @Override
+  public synchronized void killedJob(JobConf conf, JobID id) 
+  {
+    numJobsKilled++;
+  }
+
+  @Override
+  public synchronized void addPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing++;
+  }
+
+  @Override
+  public synchronized void decPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing--;
+  }
+
+  @Override
+  public synchronized void addRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning++;
+  }
+
+  @Override
+  public synchronized void decRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning--;
+  }
+
+  @Override
+  public synchronized void addRunningMaps(JobID id, int task)
+  {
+    numRunningMaps += task;
+  }
+
+  @Override
+  public synchronized void decRunningMaps(JobID id, int task) 
+  {
+    numRunningMaps -= task;
+  }
+
+  @Override
+  public synchronized void addRunningReduces(JobID id, int task)
+  {
+    numRunningReduces += task;
+  }
+
+  @Override
+  public synchronized void decRunningReduces(JobID id, int task)
+  {
+    numRunningReduces -= task;
+  }
+
+  @Override
+  public synchronized void killedMap(TaskAttemptID taskAttemptID)
+  {
+    numMapTasksKilled++;
+  }
+
+  @Override
+  public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+  {
+    numReduceTasksKilled++;
+  }
+
+  @Override
+  public synchronized void addTrackers(int trackers)
+  {
+    numTrackers += trackers;
+  }
+
+  @Override
+  public synchronized void decTrackers(int trackers)
+  {
+    numTrackers -= trackers;
+  }
+
+  @Override
+  public synchronized void addBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed += trackers;
+  }
+
+  @Override
+  public synchronized void decBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed -= trackers;
+  }
+
+  @Override
+  public synchronized void setDecommissionedTrackers(int trackers)
+  {
+    numTrackersDecommissioned = trackers;
+  }  
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobKillAndFail.java Fri Mar  4 03:34:25 2011
@@ -37,25 +37,93 @@ public class TestJobKillAndFail extends 
   public void testJobFailAndKill() throws IOException {
     MiniMRCluster mr = null;
     try {
-      mr = new MiniMRCluster(2, "file:///", 3);
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.jobtracker.instrumentation", 
+          JTInstrumentation.class.getName());
+      mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
+      JTInstrumentation instr = (JTInstrumentation) 
+        mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
 
       // run the TCs
       JobConf conf = mr.createJobConf();
-
+      
       Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
       Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
       RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
       // Checking that the Job got failed
       assertEquals(job.getJobState(), JobStatus.FAILED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.failed);
+      instr.reset();
 
+      
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
       // Checking that the Job got killed
       assertTrue(job.isComplete());
       assertEquals(job.getJobState(), JobStatus.KILLED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.killed);
     } finally {
       if (mr != null) {
         mr.shutdown();
       }
     }
   }
+  
+  static class JTInstrumentation extends JobTrackerInstrumentation {
+    volatile int failed;
+    volatile int killed;
+    volatile int addPrep;
+    volatile int decPrep;
+    volatile int addRunning;
+    volatile int decRunning;
+
+    void reset() {
+      failed = 0;
+      killed = 0;
+      addPrep = 0;
+      decPrep = 0;
+      addRunning = 0;
+      decRunning = 0;
+    }
+
+    boolean verifyJob() {
+      return addPrep==1 && decPrep==1 && addRunning==1 && decRunning==1;
+    }
+
+    public JTInstrumentation(JobTracker jt, JobConf conf) {
+      super(jt, conf);
+    }
+
+    public synchronized void addPrepJob(JobConf conf, JobID id) 
+    {
+      addPrep++;
+    }
+    
+    public synchronized void decPrepJob(JobConf conf, JobID id) 
+    {
+      decPrep++;
+    }
+
+    public synchronized void addRunningJob(JobConf conf, JobID id) 
+    {
+      addRunning++;
+    }
+
+    public synchronized void decRunningJob(JobConf conf, JobID id) 
+    {
+      decRunning++;
+    }
+    
+    public synchronized void failedJob(JobConf conf, JobID id) 
+    {
+      failed++;
+    }
+
+    public synchronized void killedJob(JobConf conf, JobID id) 
+    {
+      killed++;
+    }
+  }
+  
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar  4 03:34:25 2011
@@ -44,8 +44,9 @@ public class TestJobQueueTaskScheduler e
     private FakeTaskTrackerManager taskTrackerManager;
     
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+        FakeTaskTrackerManager taskTrackerManager, JobTracker jt) 
+          throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
@@ -267,7 +268,8 @@ public class TestJobQueueTaskScheduler e
                          int numJobs, int state)
     throws IOException {
     for (int i = 0; i < numJobs; i++) {
-      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager, 
+          UtilsForTests.getJobTracker());
       job.getStatus().setRunState(state);
       taskTrackerManager.submitJob(job);
     }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=1077034&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Fri Mar  4 03:34:25 2011
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.examples.SleepJob;
+
+@SuppressWarnings("deprecation")
+public class TestJobTrackerInstrumentation extends TestCase {
+
+  public void testSlots() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.jobtracker.instrumentation", 
+          MyJobTrackerMetricsInst.class.getName());
+      mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
+      MyJobTrackerMetricsInst instr = (MyJobTrackerMetricsInst) 
+        mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
+
+      JobConf conf = mr.createJobConf();
+      SleepJob job = new SleepJob();
+      job.setConf(conf);
+      int numMapTasks = 3;
+      int numReduceTasks = 2;
+      job.run(numMapTasks, numReduceTasks, 10000, 1, 10000, 1);
+      
+      synchronized (instr) {
+        //after the job completes, incr and decr should be equal
+        assertEquals(instr.incrOccupiedMapSlots, 
+            instr.decrOccupiedMapSlots);
+        assertEquals(instr.incrOccupiedReduceSlots, 
+            instr.decrOccupiedReduceSlots);
+        assertEquals(instr.incrRunningMaps,
+            instr.decrRunningMaps);
+        assertEquals(instr.incrRunningReduces,
+            instr.decrRunningReduces);
+        assertEquals(instr.incrReservedMapSlots,
+            instr.decrReservedMapSlots);
+        assertEquals(instr.incrReservedReduceSlots,
+            instr.decrReservedReduceSlots);
+        
+        //validate that atleast once the callbacks happened
+        assertTrue(instr.incrOccupiedMapSlots > 0);
+        assertTrue(instr.incrOccupiedReduceSlots > 0);
+        assertTrue(instr.incrRunningMaps > 0);
+        assertTrue(instr.incrRunningReduces > 0);
+      }
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  static class MyJobTrackerMetricsInst extends JobTrackerInstrumentation  {
+    public MyJobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
+      super(tracker, conf);
+    }
+
+    private int incrReservedMapSlots = 0;
+    private int decrReservedMapSlots = 0;
+    private int incrReservedReduceSlots = 0;
+    private int decrReservedReduceSlots = 0;
+    private int incrOccupiedMapSlots = 0;
+    private int decrOccupiedMapSlots = 0;
+    private int incrOccupiedReduceSlots = 0;
+    private int decrOccupiedReduceSlots = 0;
+    private int incrRunningMaps = 0;
+    private int decrRunningMaps = 0;
+    private int incrRunningReduces = 0;
+    private int decrRunningReduces = 0;
+
+    @Override
+    public synchronized void addReservedMapSlots(int slots)
+    { 
+      incrReservedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decReservedMapSlots(int slots)
+    {
+      decrReservedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void addReservedReduceSlots(int slots)
+    {
+      incrReservedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decReservedReduceSlots(int slots)
+    {
+      decrReservedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void addOccupiedMapSlots(int slots)
+    {
+      incrOccupiedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void decOccupiedMapSlots(int slots)
+    {
+      decrOccupiedMapSlots += slots;
+    }
+
+    @Override
+    public synchronized void addOccupiedReduceSlots(int slots)
+    {
+      incrOccupiedReduceSlots += slots;
+    }
+
+    @Override
+    public synchronized void decOccupiedReduceSlots(int slots)
+    {
+      decrOccupiedReduceSlots += slots;
+    }
+    
+    @Override
+    public synchronized void addRunningMaps(JobID id, int task)
+    {
+      incrRunningMaps += task;
+    }
+
+    @Override
+    public synchronized void decRunningMaps(JobID id, int task) 
+    {
+      decrRunningMaps += task;
+    }
+
+    @Override
+    public synchronized void addRunningReduces(JobID id, int task)
+    {
+      incrRunningReduces += task;
+    }
+
+    @Override
+    public synchronized void decRunningReduces(JobID id, int task)
+    {
+      decrRunningReduces += task;
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Fri Mar  4 03:34:25 2011
@@ -43,8 +43,10 @@ public class TestParallelInitialization 
   class FakeJobInProgress extends JobInProgress {
    
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+        FakeTaskTrackerManager taskTrackerManager,
+        JobTracker jt) throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf,
+          jt);
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
       this.status.setJobPriority(JobPriority.NORMAL);
@@ -213,7 +215,8 @@ public class TestParallelInitialization 
     // will be inited first and that will hang
     
     for (int i = 0; i < NUM_JOBS; i++) {
-      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
+      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager, 
+          UtilsForTests.getJobTracker());
       jobs[i].getStatus().setRunState(JobStatus.PREP);
       taskTrackerManager.submitJob(jobs[i]);
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Fri Mar  4 03:34:25 2011
@@ -32,7 +32,8 @@ public class TestResourceEstimation exte
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc);
+    JobInProgress jip = new JobInProgress(jid, jc, 
+        UtilsForTests.getJobTracker());
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
@@ -65,7 +66,8 @@ public class TestResourceEstimation exte
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc) {
+    JobInProgress jip = new JobInProgress(jid, jc, 
+        UtilsForTests.getJobTracker()) {
       long getInputLength() {
         return singleMapInputSize*desiredMaps();
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077034&r1=1077033&r2=1077034&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar  4 03:34:25 2011
@@ -731,6 +731,18 @@ public class UtilsForTests {
     }
     return pid;
   }
-  
+
+  static JobTracker getJobTracker() {
+    JobConf conf = new JobConf();
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    JobTracker jt;
+    try {
+      jt = new JobTracker(conf);
+      return jt;
+    } catch (Exception e) {
+      throw new RuntimeException("Could not start jt", e);
+    }
+  }
 }