You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:34:31 UTC

svn commit: r1077035 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/mapred/ webapps/job/

Author: omalley
Date: Fri Mar  4 03:34:31 2011
New Revision: 1077035

URL: http://svn.apache.org/viewvc?rev=1077035&view=rev
Log:
commit 70534253bd26e8850633d1f5dcdf7f67d9a38a83
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date:   Sun Oct 25 16:05:47 2009 +0530

    MAPREDUCE:1048 from http://issues.apache.org/jira/secure/attachment/12423136/MAPREDUCE-1048-20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1048. Add occupied/reserved slot usage summary on
    +    jobtracker UI. (Amareshwari Sriramadasu via sharad)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:34:31 2011
@@ -1571,6 +1571,7 @@ class JobInProgress {
     else {
       jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
     }
+    jobtracker.incrementReservations(type, reservedSlots);
   }
   
   public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1603,6 +1604,7 @@ class JobInProgress {
       jobtracker.getInstrumentation().decReservedReduceSlots(
         info.getNumSlots());
     }
+    jobtracker.decrementReservations(type, info.getNumSlots());
   }
   
   public int getNumReservedTaskTrackersForMaps() {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:34:31 2011
@@ -98,6 +98,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
@@ -1844,6 +1845,10 @@ public class JobTracker implements MRCon
   //
   int totalMaps = 0;
   int totalReduces = 0;
+  private int occupiedMapSlots = 0;
+  private int occupiedReduceSlots = 0;
+  private int reservedMapSlots = 0;
+  private int reservedReduceSlots = 0;
   private HashMap<String, TaskTracker> taskTrackers =
     new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -3074,6 +3079,8 @@ public class JobTracker implements MRCon
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+      occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
       getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
       getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -3098,6 +3105,8 @@ public class JobTracker implements MRCon
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      occupiedMapSlots += status.countOccupiedMapSlots();
+      occupiedReduceSlots += status.countOccupiedReduceSlots();
       getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
       getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
@@ -3167,7 +3176,26 @@ public class JobTracker implements MRCon
     return oldStatus != null;
   }
   
-  
+  // Increment the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void incrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots += reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots += reservedSlots;
+    }
+  }
+
+  // Decrement the number of reserved slots in the cluster.
+  // This method assumes the caller has JobTracker lock.
+  void decrementReservations(TaskType type, int reservedSlots) {
+    if (type.equals(TaskType.MAP)) {
+      reservedMapSlots -= reservedSlots;
+    } else if (type.equals(TaskType.REDUCE)) {
+      reservedReduceSlots -= reservedSlots;
+    }
+  }
+
   private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
     TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
     synchronized (faultyTrackers) {
@@ -3622,7 +3650,17 @@ public class JobTracker implements MRCon
       }
     }
   }
-    
+
+  public synchronized ClusterMetrics getClusterMetrics() {
+    return new ClusterMetrics(totalMaps,
+      totalReduces, occupiedMapSlots, occupiedReduceSlots,
+      reservedMapSlots, reservedReduceSlots,
+      totalMapTaskCapacity, totalReduceTaskCapacity,
+      totalSubmissions,
+      taskTrackers.size() - getBlacklistedTrackerCount(), 
+      getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
+  }
+
   public synchronized void killJob(JobID jobid) throws IOException {
     if (null == jobid) {
       LOG.info("Null jobid object sent to JobTracker.killJob()");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Mar  4 03:34:31 2011
@@ -200,6 +200,12 @@ public class TaskTrackerStatus implement
     this.healthStatus = new TaskTrackerHealthStatus();
   }
 
+  TaskTrackerStatus(String trackerName, String host) {
+    this();
+    this.trackerName = trackerName;
+    this.host = host;
+  }
+
   /**
    */
   public TaskTrackerStatus(String trackerName, String host, 

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=1077035&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java Fri Mar  4 03:34:31 2011
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Status information on the current state of the Map-Reduce cluster.
+ * 
+ * <p><code>ClusterMetrics</code> provides clients with information such as:
+ * <ol>
+ *   <li>
+ *   Size of the cluster.  
+ *   </li>
+ *   <li>
+ *   Number of blacklisted and decommissioned trackers.  
+ *   </li>
+ *   <li>
+ *   Slot capacity of the cluster. 
+ *   </li>
+ *   <li>
+ *   The number of currently occupied/reserved map & reduce slots.
+ *   </li>
+ *   <li>
+ *   The number of currently running map & reduce tasks.
+ *   </li>
+ *   <li>
+ *   The number of job submissions.
+ *   </li>
+ * </ol></p>
+ * 
+ */
+public class ClusterMetrics implements Writable {
+  private int runningMaps;
+  private int runningReduces;
+  private int occupiedMapSlots;
+  private int occupiedReduceSlots;
+  private int reservedMapSlots;
+  private int reservedReduceSlots;
+  private int totalMapSlots;
+  private int totalReduceSlots;
+  private int totalJobSubmissions;
+  private int numTrackers;
+  private int numBlacklistedTrackers;
+  private int numDecommissionedTrackers;
+
+  public ClusterMetrics() {
+  }
+  
+  public ClusterMetrics(int runningMaps, int runningReduces,
+      int occupiedMapSlots, int occupiedReduceSlots,
+      int reservedMapSlots, int reservedReduceSlots,
+      int mapSlots, int reduceSlots, 
+      int totalJobSubmissions,
+      int numTrackers, int numBlacklistedTrackers,
+      int numDecommissionedNodes) {
+    this.runningMaps = runningMaps;
+    this.runningReduces = runningReduces;
+    this.occupiedMapSlots = occupiedMapSlots;
+    this.occupiedReduceSlots = occupiedReduceSlots;
+    this.reservedMapSlots = reservedMapSlots;
+    this.reservedReduceSlots = reservedReduceSlots;
+    this.totalMapSlots = mapSlots;
+    this.totalReduceSlots = reduceSlots;
+    this.totalJobSubmissions = totalJobSubmissions;
+    this.numTrackers = numTrackers;
+    this.numBlacklistedTrackers = numBlacklistedTrackers;
+    this.numDecommissionedTrackers = numDecommissionedNodes;
+  }
+
+  /**
+   * Get the number of running map tasks in the cluster.
+   * 
+   * @return running maps
+   */
+  public int getRunningMaps() {
+    return runningMaps;
+  }
+  
+  /**
+   * Get the number of running reduce tasks in the cluster.
+   * 
+   * @return running reduces
+   */
+  public int getRunningReduces() {
+    return runningReduces;
+  }
+  
+  /**
+   * Get number of occupied map slots in the cluster.
+   * 
+   * @return occupied map slot count
+   */
+  public int getOccupiedMapSlots() { 
+    return occupiedMapSlots;
+  }
+  
+  /**
+   * Get the number of occupied reduce slots in the cluster.
+   * 
+   * @return occupied reduce slot count
+   */
+  public int getOccupiedReduceSlots() { 
+    return occupiedReduceSlots; 
+  }
+
+  /**
+   * Get number of reserved map slots in the cluster.
+   * 
+   * @return reserved map slot count
+   */
+  public int getReservedMapSlots() { 
+    return reservedMapSlots;
+  }
+  
+  /**
+   * Get the number of reserved reduce slots in the cluster.
+   * 
+   * @return reserved reduce slot count
+   */
+  public int getReservedReduceSlots() { 
+    return reservedReduceSlots; 
+  }
+
+  /**
+   * Get the total number of map slots in the cluster.
+   * 
+   * @return map slot capacity
+   */
+  public int getMapSlotCapacity() {
+    return totalMapSlots;
+  }
+  
+  /**
+   * Get the total number of reduce slots in the cluster.
+   * 
+   * @return reduce slot capacity
+   */
+  public int getReduceSlotCapacity() {
+    return totalReduceSlots;
+  }
+  
+  /**
+   * Get the total number of job submissions in the cluster.
+   * 
+   * @return total number of job submissions
+   */
+  public int getTotalJobSubmissions() {
+    return totalJobSubmissions;
+  }
+  
+  /**
+   * Get the number of active trackers in the cluster.
+   * 
+   * @return active tracker count.
+   */
+  public int getTaskTrackerCount() {
+    return numTrackers;
+  }
+  
+  /**
+   * Get the number of blacklisted trackers in the cluster.
+   * 
+   * @return blacklisted tracker count
+   */
+  public int getBlackListedTaskTrackerCount() {
+    return numBlacklistedTrackers;
+  }
+  
+  /**
+   * Get the number of decommissioned trackers in the cluster.
+   * 
+   * @return decommissioned tracker count
+   */
+  public int getDecommissionedTaskTrackerCount() {
+    return numDecommissionedTrackers;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    runningMaps = in.readInt();
+    runningReduces = in.readInt();
+    occupiedMapSlots = in.readInt();
+    occupiedReduceSlots = in.readInt();
+    reservedMapSlots = in.readInt();
+    reservedReduceSlots = in.readInt();
+    totalMapSlots = in.readInt();
+    totalReduceSlots = in.readInt();
+    totalJobSubmissions = in.readInt();
+    numTrackers = in.readInt();
+    numBlacklistedTrackers = in.readInt();
+    numDecommissionedTrackers = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(runningMaps);
+    out.writeInt(runningReduces);
+    out.writeInt(occupiedMapSlots);
+    out.writeInt(occupiedReduceSlots);
+    out.writeInt(reservedMapSlots);
+    out.writeInt(reservedReduceSlots);
+    out.writeInt(totalMapSlots);
+    out.writeInt(totalReduceSlots);
+    out.writeInt(totalJobSubmissions);
+    out.writeInt(numTrackers);
+    out.writeInt(numBlacklistedTrackers);
+    out.writeInt(numDecommissionedTrackers);
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java?rev=1077035&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java Fri Mar  4 03:34:31 2011
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Class to test that ClusterMetrics are being created with the right
+ * counts of occupied and reserved slots.
+ * 
+ * The tests exercise code paths where the counts of slots are updated.
+ */
+public class TestClusterStatus extends TestCase {
+
+  private static String[] trackers = new String[] { "tracker_tracker1:1000",
+      "tracker_tracker2:1000", "tracker_tracker3:1000" };
+  private static JobTracker jobTracker;
+  private static int mapSlotsPerTracker = 4;
+  private static int reduceSlotsPerTracker = 2;
+  private static MiniMRCluster mr;
+  private static JobClient client;
+  // heartbeat responseId. increment this after sending a heartbeat
+  private static short responseId = 1;
+  private static FakeJobInProgress fakeJob;
+  private static FakeTaskScheduler scheduler;
+  
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        conf.setClass("mapred.jobtracker.taskScheduler", 
+            TestClusterStatus.FakeTaskScheduler.class,
+                  TaskScheduler.class);
+        mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
+        jobTracker = mr.getJobTrackerRunner().getJobTracker();
+        for (String tracker : trackers) {
+          establishFirstContact(jobTracker, tracker);
+        }
+        client = new JobClient(mr.createJobConf());
+      }
+
+      protected void tearDown() throws Exception {
+        client.close();
+        mr.shutdown();
+      }
+    };
+    return setup;
+  }
+
+  /**
+   * Fake scheduler to test reservations.
+   * 
+   * The reservations are updated incrementally in each
+   * heartbeat to pass through the re-reservation logic,
+   * until the scheduler is asked to unreserve slots.
+   */
+  static class FakeTaskScheduler extends JobQueueTaskScheduler {
+    
+    private Map<TaskTracker, Integer> reservedCounts 
+      = new HashMap<TaskTracker, Integer>();
+  
+    // this variable can be set to trigger unreservations.
+    private boolean unreserveSlots;
+    
+    public FakeTaskScheduler() {
+      super();
+      scheduler = this;
+    }
+
+    void setUnreserveSlots(boolean shouldUnreserve) {
+      unreserveSlots = shouldUnreserve;
+    }
+    
+    @Override
+    public List<Task> assignTasks(TaskTracker tt) {
+      if (unreserveSlots) {
+        tt.unreserveSlots(TaskType.MAP, fakeJob);
+        tt.unreserveSlots(TaskType.REDUCE, fakeJob);
+      } else {
+        int currCount = 1;
+        if (reservedCounts.containsKey(tt)) {
+          currCount = reservedCounts.get(tt) + 1;
+        }
+        reservedCounts.put(tt, currCount);
+        tt.reserveSlots(TaskType.MAP, fakeJob, currCount);
+        tt.reserveSlots(TaskType.REDUCE, fakeJob, currCount);
+      }
+      return new ArrayList<Task>();  
+    }
+  }
+
+  /**
+   * Fake class for JobInProgress to allow testing reservation
+   * counts.
+   * 
+   * This class can only be used to test functionality related to
+   * reservations, and not other aspects of the JobInProgress code
+   * because the fields may not be initialized correctly.
+   */
+  static class FakeJobInProgress extends JobInProgress {
+    public FakeJobInProgress(JobID jId, JobConf jobConf,
+                JobTracker jt) {
+      super(jId, jobConf, jt);
+    }
+  }
+  
+  static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, 
+      boolean initialContact, boolean acceptNewTasks, 
+      String tracker, short responseId) 
+      throws IOException {
+    if (status == null) {
+      status = new TaskTrackerStatus(tracker, 
+      JobInProgress.convertTrackerNameToHostName(tracker));
+    }
+    jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
+    return ++responseId ;
+  }
+
+  static void establishFirstContact(JobTracker jt, String tracker) 
+      throws IOException {
+    sendHeartBeat(jt, null, true, false, tracker, (short) 0);
+  }
+
+  private TaskTrackerStatus getTTStatus(String trackerName,
+      List<TaskStatus> taskStatuses) {
+    return new TaskTrackerStatus(trackerName, 
+      JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+      taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+  }
+  
+  public void testClusterMetrics() throws IOException, InterruptedException {
+    assertEquals("tasktracker count doesn't match", trackers.length,
+      client.getClusterStatus().getTaskTrackers());
+    
+    List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+    // create a map task status, which uses 2 slots. 
+    int mapSlotsPerTask = 2;
+    addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
+    
+    // create a reduce task status, which uses 1 slot.
+    int reduceSlotsPerTask = 1;
+    addReduceTaskAttemptToList(list, 
+        reduceSlotsPerTask, TaskStatus.State.RUNNING);
+    
+    // create TaskTrackerStatus and send heartbeats
+    sendHeartbeats(list);
+
+    // assert ClusterMetrics
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("occupied map slots do not match", mapSlotsPerTask,
+      metrics.getOccupiedMapSlots());
+    assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+      metrics.getOccupiedReduceSlots());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      metrics.getMapSlotCapacity());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      metrics.getReduceSlotCapacity());
+    assertEquals("running map tasks do not match", 1,
+      metrics.getRunningMaps());
+    assertEquals("running reduce tasks do not match", 1,
+      metrics.getRunningReduces());
+    
+    // assert the values in ClusterStatus also
+    ClusterStatus stat = client.getClusterStatus();
+    assertEquals("running map tasks do not match", 1,
+      stat.getMapTasks());
+    assertEquals("running reduce tasks do not match", 1,
+      stat.getReduceTasks());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      stat.getMaxMapTasks());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      stat.getMaxReduceTasks());
+    
+    // send a heartbeat finishing only a map and check
+    // counts are updated.
+    list.clear();
+    addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
+    addReduceTaskAttemptToList(list, 
+        reduceSlotsPerTask, TaskStatus.State.RUNNING);
+    sendHeartbeats(list);
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals(0, metrics.getOccupiedMapSlots());
+    assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
+    
+    // send a heartbeat finishing the reduce task also.
+    list.clear();
+    addReduceTaskAttemptToList(list, 
+        reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
+    sendHeartbeats(list);
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals(0, metrics.getOccupiedReduceSlots());
+  }
+  
+  private void sendHeartbeats(List<TaskStatus> list) throws IOException {
+    TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+    status[0] = getTTStatus(trackers[0], list);
+    status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+    status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+    for (int i = 0; i< trackers.length; i++) {
+      sendHeartBeat(jobTracker, status[i], false, false, 
+          trackers[i], responseId);
+    }
+    responseId++;
+  }
+
+  private void addReduceTaskAttemptToList(List<TaskStatus> list, 
+      int reduceSlotsPerTask, TaskStatus.State state) {
+    TaskStatus ts = TaskStatus.createTaskStatus(false, 
+      new TaskAttemptID("jt", 1, false, 0, 0), 0.0f,
+      reduceSlotsPerTask,
+      state, "", "", trackers[0], 
+      TaskStatus.Phase.REDUCE, null);
+    list.add(ts);
+  }
+
+  private void addMapTaskAttemptToList(List<TaskStatus> list, 
+      int mapSlotsPerTask, TaskStatus.State state) {
+    TaskStatus ts = TaskStatus.createTaskStatus(true, 
+      new TaskAttemptID("jt", 1, true, 0, 0), 0.0f, mapSlotsPerTask,
+      state, "", "", trackers[0], 
+      TaskStatus.Phase.MAP, null);
+    list.add(ts);
+  }
+
+  public void testReservedSlots() throws IOException {
+    JobConf conf = mr.createJobConf();
+
+    conf.setNumReduceTasks(1);
+    conf.setSpeculativeExecution(false);
+    
+    //Set task tracker objects for reservation.
+    TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+    TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+    TaskTrackerStatus status1 = new TaskTrackerStatus(
+        trackers[0],JobInProgress.convertTrackerNameToHostName(
+            trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    TaskTrackerStatus status2 = new TaskTrackerStatus(
+        trackers[1],JobInProgress.convertTrackerNameToHostName(
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    tt1.setStatus(status1);
+    tt2.setStatus(status2);
+    
+    fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf(conf),
+                    jobTracker);
+    
+    sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
+    sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
+    responseId++; 
+    ClusterMetrics metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match", 
+      2, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match", 
+      2, metrics.getReservedReduceSlots());
+
+    // redo to test re-reservations.
+    sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
+    sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
+    responseId++; 
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("reserved map slots do not match", 
+        4, metrics.getReservedMapSlots());
+    assertEquals("reserved reduce slots do not match", 
+        4, metrics.getReservedReduceSlots());
+
+    // undo reservations now.
+    scheduler.setUnreserveSlots(true);
+    sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
+    sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
+    responseId++;
+    metrics = jobTracker.getClusterMetrics();
+    assertEquals("map slots should have been unreserved",
+        0, metrics.getReservedMapSlots());
+    assertEquals("reduce slots should have been unreserved",
+        0, metrics.getReservedReduceSlots());
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp Fri Mar  4 03:34:31 2011
@@ -6,11 +6,13 @@
   import="java.util.*"
   import="java.text.DecimalFormat"
   import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapreduce.*"
   import="org.apache.hadoop.util.*"
 %>
 <%
   JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   ClusterStatus status = tracker.getClusterStatus();
+  ClusterMetrics metrics = tracker.getClusterMetrics();
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
   JobQueueInfo[] queues = tracker.getQueues();
@@ -21,31 +23,38 @@
 <%!
   private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
   
-  public void generateSummaryTable(JspWriter out, ClusterStatus status,
+  public void generateSummaryTable(JspWriter out, ClusterMetrics metrics,
                                    JobTracker tracker) throws IOException {
-    String tasksPerNode = status.getTaskTrackers() > 0 ?
-      percentFormat.format(((double)(status.getMaxMapTasks() +
-                      status.getMaxReduceTasks())) / status.getTaskTrackers()):
+    String tasksPerNode = metrics.getTaskTrackerCount() > 0 ?
+      percentFormat.format(((double)(metrics.getMapSlotCapacity() +
+      metrics.getReduceSlotCapacity())) / metrics.getTaskTrackerCount()):
       "-";
     out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"+
-              "<tr><th>Maps</th><th>Reduces</th>" + 
+              "<tr><th>Running Map Tasks</th><th>Running Reduce Tasks</th>" + 
               "<th>Total Submissions</th>" +
-              "<th>Nodes</th><th>Map Task Capacity</th>" +
+              "<th>Nodes</th>" + 
+              "<th>Occupied Map Slots</th><th>Occupied Reduce Slots</th>" + 
+              "<th>Reserved Map Slots</th><th>Reserved Reduce Slots</th>" + 
+              "<th>Map Task Capacity</th>" +
               "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" + 
               "<th>Blacklisted Nodes</th>" +
               "<th>Excluded Nodes</th></tr>\n");
-    out.print("<tr><td>" + status.getMapTasks() + "</td><td>" +
-              status.getReduceTasks() + "</td><td>" + 
-              tracker.getTotalSubmissions() +
+    out.print("<tr><td>" + metrics.getRunningMaps() + "</td><td>" +
+              metrics.getRunningReduces() + "</td><td>" + 
+              metrics.getTotalJobSubmissions() +
               "</td><td><a href=\"machines.jsp?type=active\">" +
-              status.getTaskTrackers() +
-              "</a></td><td>" + status.getMaxMapTasks() +
-              "</td><td>" + status.getMaxReduceTasks() +
+              metrics.getTaskTrackerCount() + "</a></td><td>" + 
+              metrics.getOccupiedMapSlots() + "</td><td>" +
+              metrics.getOccupiedReduceSlots() + "</td><td>" + 
+              metrics.getReservedMapSlots() + "</td><td>" +
+              metrics.getReservedReduceSlots() + "</td><td>" + 
+              metrics.getMapSlotCapacity() +
+              "</td><td>" + metrics.getReduceSlotCapacity() +
               "</td><td>" + tasksPerNode +
               "</td><td><a href=\"machines.jsp?type=blacklisted\">" +
-              status.getBlacklistedTrackers() + "</a>" +
+              metrics.getBlackListedTaskTrackerCount() + "</a>" +
               "</td><td><a href=\"machines.jsp?type=excluded\">" +
-              status.getNumExcludedNodes() + "</a>" +
+              metrics.getDecommissionedTaskTrackerCount() + "</a>" +
               "</td></tr></table>\n");
 
     out.print("<br>");
@@ -95,7 +104,7 @@
 <hr>
 <h2>Cluster Summary (Heap Size is <%= StringUtils.byteDesc(status.getUsedMemory()) %>/<%= StringUtils.byteDesc(status.getMaxMemory()) %>)</h2>
 <% 
- generateSummaryTable(out, status, tracker); 
+ generateSummaryTable(out, metrics, tracker); 
 %>
 <hr>
 <h2 id="scheduling_info">Scheduling Information</h2>