You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:34:31 UTC
svn commit: r1077035 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/
test/org/apache/hadoop/mapred/ webapps/job/
Author: omalley
Date: Fri Mar 4 03:34:31 2011
New Revision: 1077035
URL: http://svn.apache.org/viewvc?rev=1077035&view=rev
Log:
commit 70534253bd26e8850633d1f5dcdf7f67d9a38a83
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date: Sun Oct 25 16:05:47 2009 +0530
MAPREDUCE:1048 from http://issues.apache.org/jira/secure/attachment/12423136/MAPREDUCE-1048-20.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1048. Add occupied/reserved slot usage summary on
+ jobtracker UI. (Amareshwari Sriramadasu via sharad)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:34:31 2011
@@ -1571,6 +1571,7 @@ class JobInProgress {
else {
jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
}
+ jobtracker.incrementReservations(type, reservedSlots);
}
public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1603,6 +1604,7 @@ class JobInProgress {
jobtracker.getInstrumentation().decReservedReduceSlots(
info.getNumSlots());
}
+ jobtracker.decrementReservations(type, info.getNumSlots());
}
public int getNumReservedTaskTrackersForMaps() {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:34:31 2011
@@ -98,6 +98,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -1844,6 +1845,10 @@ public class JobTracker implements MRCon
//
int totalMaps = 0;
int totalReduces = 0;
+ private int occupiedMapSlots = 0;
+ private int occupiedReduceSlots = 0;
+ private int reservedMapSlots = 0;
+ private int reservedReduceSlots = 0;
private HashMap<String, TaskTracker> taskTrackers =
new HashMap<String, TaskTracker>();
Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -3074,6 +3079,8 @@ public class JobTracker implements MRCon
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
+ occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+ occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -3098,6 +3105,8 @@ public class JobTracker implements MRCon
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
+ occupiedMapSlots += status.countOccupiedMapSlots();
+ occupiedReduceSlots += status.countOccupiedReduceSlots();
getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(status.getHost())) {
@@ -3167,7 +3176,26 @@ public class JobTracker implements MRCon
return oldStatus != null;
}
-
+ // Increment the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void incrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots += reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots += reservedSlots;
+ }
+ }
+
+ // Decrement the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void decrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots -= reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots -= reservedSlots;
+ }
+ }
+
private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
synchronized (faultyTrackers) {
@@ -3622,7 +3650,17 @@ public class JobTracker implements MRCon
}
}
}
-
+
+ public synchronized ClusterMetrics getClusterMetrics() {
+ return new ClusterMetrics(totalMaps,
+ totalReduces, occupiedMapSlots, occupiedReduceSlots,
+ reservedMapSlots, reservedReduceSlots,
+ totalMapTaskCapacity, totalReduceTaskCapacity,
+ totalSubmissions,
+ taskTrackers.size() - getBlacklistedTrackerCount(),
+ getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
+ }
+
public synchronized void killJob(JobID jobid) throws IOException {
if (null == jobid) {
LOG.info("Null jobid object sent to JobTracker.killJob()");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Mar 4 03:34:31 2011
@@ -200,6 +200,12 @@ public class TaskTrackerStatus implement
this.healthStatus = new TaskTrackerHealthStatus();
}
+ TaskTrackerStatus(String trackerName, String host) {
+ this();
+ this.trackerName = trackerName;
+ this.host = host;
+ }
+
/**
*/
public TaskTrackerStatus(String trackerName, String host,
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=1077035&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/ClusterMetrics.java Fri Mar 4 03:34:31 2011
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Status information on the current state of the Map-Reduce cluster.
+ *
+ * <p><code>ClusterMetrics</code> provides clients with information such as:
+ * <ol>
+ * <li>
+ * Size of the cluster.
+ * </li>
+ * <li>
+ * Number of blacklisted and decommissioned trackers.
+ * </li>
+ * <li>
+ * Slot capacity of the cluster.
+ * </li>
+ * <li>
+ * The number of currently occupied/reserved map & reduce slots.
+ * </li>
+ * <li>
+ * The number of currently running map & reduce tasks.
+ * </li>
+ * <li>
+ * The number of job submissions.
+ * </li>
+ * </ol></p>
+ *
+ */
+public class ClusterMetrics implements Writable {
+ private int runningMaps;
+ private int runningReduces;
+ private int occupiedMapSlots;
+ private int occupiedReduceSlots;
+ private int reservedMapSlots;
+ private int reservedReduceSlots;
+ private int totalMapSlots;
+ private int totalReduceSlots;
+ private int totalJobSubmissions;
+ private int numTrackers;
+ private int numBlacklistedTrackers;
+ private int numDecommissionedTrackers;
+
+ public ClusterMetrics() {
+ }
+
+ public ClusterMetrics(int runningMaps, int runningReduces,
+ int occupiedMapSlots, int occupiedReduceSlots,
+ int reservedMapSlots, int reservedReduceSlots,
+ int mapSlots, int reduceSlots,
+ int totalJobSubmissions,
+ int numTrackers, int numBlacklistedTrackers,
+ int numDecommissionedNodes) {
+ this.runningMaps = runningMaps;
+ this.runningReduces = runningReduces;
+ this.occupiedMapSlots = occupiedMapSlots;
+ this.occupiedReduceSlots = occupiedReduceSlots;
+ this.reservedMapSlots = reservedMapSlots;
+ this.reservedReduceSlots = reservedReduceSlots;
+ this.totalMapSlots = mapSlots;
+ this.totalReduceSlots = reduceSlots;
+ this.totalJobSubmissions = totalJobSubmissions;
+ this.numTrackers = numTrackers;
+ this.numBlacklistedTrackers = numBlacklistedTrackers;
+ this.numDecommissionedTrackers = numDecommissionedNodes;
+ }
+
+ /**
+ * Get the number of running map tasks in the cluster.
+ *
+ * @return running maps
+ */
+ public int getRunningMaps() {
+ return runningMaps;
+ }
+
+ /**
+ * Get the number of running reduce tasks in the cluster.
+ *
+ * @return running reduces
+ */
+ public int getRunningReduces() {
+ return runningReduces;
+ }
+
+ /**
+ * Get number of occupied map slots in the cluster.
+ *
+ * @return occupied map slot count
+ */
+ public int getOccupiedMapSlots() {
+ return occupiedMapSlots;
+ }
+
+ /**
+ * Get the number of occupied reduce slots in the cluster.
+ *
+ * @return occupied reduce slot count
+ */
+ public int getOccupiedReduceSlots() {
+ return occupiedReduceSlots;
+ }
+
+ /**
+ * Get number of reserved map slots in the cluster.
+ *
+ * @return reserved map slot count
+ */
+ public int getReservedMapSlots() {
+ return reservedMapSlots;
+ }
+
+ /**
+ * Get the number of reserved reduce slots in the cluster.
+ *
+ * @return reserved reduce slot count
+ */
+ public int getReservedReduceSlots() {
+ return reservedReduceSlots;
+ }
+
+ /**
+ * Get the total number of map slots in the cluster.
+ *
+ * @return map slot capacity
+ */
+ public int getMapSlotCapacity() {
+ return totalMapSlots;
+ }
+
+ /**
+ * Get the total number of reduce slots in the cluster.
+ *
+ * @return reduce slot capacity
+ */
+ public int getReduceSlotCapacity() {
+ return totalReduceSlots;
+ }
+
+ /**
+ * Get the total number of job submissions in the cluster.
+ *
+ * @return total number of job submissions
+ */
+ public int getTotalJobSubmissions() {
+ return totalJobSubmissions;
+ }
+
+ /**
+ * Get the number of active trackers in the cluster.
+ *
+ * @return active tracker count.
+ */
+ public int getTaskTrackerCount() {
+ return numTrackers;
+ }
+
+ /**
+ * Get the number of blacklisted trackers in the cluster.
+ *
+ * @return blacklisted tracker count
+ */
+ public int getBlackListedTaskTrackerCount() {
+ return numBlacklistedTrackers;
+ }
+
+ /**
+ * Get the number of decommissioned trackers in the cluster.
+ *
+ * @return decommissioned tracker count
+ */
+ public int getDecommissionedTaskTrackerCount() {
+ return numDecommissionedTrackers;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ runningMaps = in.readInt();
+ runningReduces = in.readInt();
+ occupiedMapSlots = in.readInt();
+ occupiedReduceSlots = in.readInt();
+ reservedMapSlots = in.readInt();
+ reservedReduceSlots = in.readInt();
+ totalMapSlots = in.readInt();
+ totalReduceSlots = in.readInt();
+ totalJobSubmissions = in.readInt();
+ numTrackers = in.readInt();
+ numBlacklistedTrackers = in.readInt();
+ numDecommissionedTrackers = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(runningMaps);
+ out.writeInt(runningReduces);
+ out.writeInt(occupiedMapSlots);
+ out.writeInt(occupiedReduceSlots);
+ out.writeInt(reservedMapSlots);
+ out.writeInt(reservedReduceSlots);
+ out.writeInt(totalMapSlots);
+ out.writeInt(totalReduceSlots);
+ out.writeInt(totalJobSubmissions);
+ out.writeInt(numTrackers);
+ out.writeInt(numBlacklistedTrackers);
+ out.writeInt(numDecommissionedTrackers);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java?rev=1077035&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java Fri Mar 4 03:34:31 2011
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Class to test that ClusterMetrics are being created with the right
+ * counts of occupied and reserved slots.
+ *
+ * The tests exercise code paths where the counts of slots are updated.
+ */
+public class TestClusterStatus extends TestCase {
+
+ private static String[] trackers = new String[] { "tracker_tracker1:1000",
+ "tracker_tracker2:1000", "tracker_tracker3:1000" };
+ private static JobTracker jobTracker;
+ private static int mapSlotsPerTracker = 4;
+ private static int reduceSlotsPerTracker = 2;
+ private static MiniMRCluster mr;
+ private static JobClient client;
+ // heartbeat responseId. increment this after sending a heartbeat
+ private static short responseId = 1;
+ private static FakeJobInProgress fakeJob;
+ private static FakeTaskScheduler scheduler;
+
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+ protected void setUp() throws Exception {
+ JobConf conf = new JobConf();
+ conf.setClass("mapred.jobtracker.taskScheduler",
+ TestClusterStatus.FakeTaskScheduler.class,
+ TaskScheduler.class);
+ mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
+ jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ for (String tracker : trackers) {
+ establishFirstContact(jobTracker, tracker);
+ }
+ client = new JobClient(mr.createJobConf());
+ }
+
+ protected void tearDown() throws Exception {
+ client.close();
+ mr.shutdown();
+ }
+ };
+ return setup;
+ }
+
+ /**
+ * Fake scheduler to test reservations.
+ *
+ * The reservations are updated incrementally in each
+ * heartbeat to pass through the re-reservation logic,
+ * until the scheduler is asked to unreserve slots.
+ */
+ static class FakeTaskScheduler extends JobQueueTaskScheduler {
+
+ private Map<TaskTracker, Integer> reservedCounts
+ = new HashMap<TaskTracker, Integer>();
+
+ // this variable can be set to trigger unreservations.
+ private boolean unreserveSlots;
+
+ public FakeTaskScheduler() {
+ super();
+ scheduler = this;
+ }
+
+ void setUnreserveSlots(boolean shouldUnreserve) {
+ unreserveSlots = shouldUnreserve;
+ }
+
+ @Override
+ public List<Task> assignTasks(TaskTracker tt) {
+ if (unreserveSlots) {
+ tt.unreserveSlots(TaskType.MAP, fakeJob);
+ tt.unreserveSlots(TaskType.REDUCE, fakeJob);
+ } else {
+ int currCount = 1;
+ if (reservedCounts.containsKey(tt)) {
+ currCount = reservedCounts.get(tt) + 1;
+ }
+ reservedCounts.put(tt, currCount);
+ tt.reserveSlots(TaskType.MAP, fakeJob, currCount);
+ tt.reserveSlots(TaskType.REDUCE, fakeJob, currCount);
+ }
+ return new ArrayList<Task>();
+ }
+ }
+
+ /**
+ * Fake class for JobInProgress to allow testing reservation
+ * counts.
+ *
+ * This class can only be used to test functionality related to
+ * reservations, and not other aspects of the JobInProgress code
+ * because the fields may not be initialized correctly.
+ */
+ static class FakeJobInProgress extends JobInProgress {
+ public FakeJobInProgress(JobID jId, JobConf jobConf,
+ JobTracker jt) {
+ super(jId, jobConf, jt);
+ }
+ }
+
+ static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status,
+ boolean initialContact, boolean acceptNewTasks,
+ String tracker, short responseId)
+ throws IOException {
+ if (status == null) {
+ status = new TaskTrackerStatus(tracker,
+ JobInProgress.convertTrackerNameToHostName(tracker));
+ }
+ jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
+ return ++responseId ;
+ }
+
+ static void establishFirstContact(JobTracker jt, String tracker)
+ throws IOException {
+ sendHeartBeat(jt, null, true, false, tracker, (short) 0);
+ }
+
+ private TaskTrackerStatus getTTStatus(String trackerName,
+ List<TaskStatus> taskStatuses) {
+ return new TaskTrackerStatus(trackerName,
+ JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+ taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+ }
+
+ public void testClusterMetrics() throws IOException, InterruptedException {
+ assertEquals("tasktracker count doesn't match", trackers.length,
+ client.getClusterStatus().getTaskTrackers());
+
+ List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+ // create a map task status, which uses 2 slots.
+ int mapSlotsPerTask = 2;
+ addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
+
+ // create a reduce task status, which uses 1 slot.
+ int reduceSlotsPerTask = 1;
+ addReduceTaskAttemptToList(list,
+ reduceSlotsPerTask, TaskStatus.State.RUNNING);
+
+ // create TaskTrackerStatus and send heartbeats
+ sendHeartbeats(list);
+
+ // assert ClusterMetrics
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("occupied map slots do not match", mapSlotsPerTask,
+ metrics.getOccupiedMapSlots());
+ assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+ metrics.getOccupiedReduceSlots());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ metrics.getMapSlotCapacity());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ metrics.getReduceSlotCapacity());
+ assertEquals("running map tasks do not match", 1,
+ metrics.getRunningMaps());
+ assertEquals("running reduce tasks do not match", 1,
+ metrics.getRunningReduces());
+
+ // assert the values in ClusterStatus also
+ ClusterStatus stat = client.getClusterStatus();
+ assertEquals("running map tasks do not match", 1,
+ stat.getMapTasks());
+ assertEquals("running reduce tasks do not match", 1,
+ stat.getReduceTasks());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ stat.getMaxMapTasks());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ stat.getMaxReduceTasks());
+
+ // send a heartbeat finishing only a map and check
+ // counts are updated.
+ list.clear();
+ addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
+ addReduceTaskAttemptToList(list,
+ reduceSlotsPerTask, TaskStatus.State.RUNNING);
+ sendHeartbeats(list);
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals(0, metrics.getOccupiedMapSlots());
+ assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
+
+ // send a heartbeat finishing the reduce task also.
+ list.clear();
+ addReduceTaskAttemptToList(list,
+ reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
+ sendHeartbeats(list);
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals(0, metrics.getOccupiedReduceSlots());
+ }
+
+ private void sendHeartbeats(List<TaskStatus> list) throws IOException {
+ TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+ status[0] = getTTStatus(trackers[0], list);
+ status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+ status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+ for (int i = 0; i< trackers.length; i++) {
+ sendHeartBeat(jobTracker, status[i], false, false,
+ trackers[i], responseId);
+ }
+ responseId++;
+ }
+
+ private void addReduceTaskAttemptToList(List<TaskStatus> list,
+ int reduceSlotsPerTask, TaskStatus.State state) {
+ TaskStatus ts = TaskStatus.createTaskStatus(false,
+ new TaskAttemptID("jt", 1, false, 0, 0), 0.0f,
+ reduceSlotsPerTask,
+ state, "", "", trackers[0],
+ TaskStatus.Phase.REDUCE, null);
+ list.add(ts);
+ }
+
+ private void addMapTaskAttemptToList(List<TaskStatus> list,
+ int mapSlotsPerTask, TaskStatus.State state) {
+ TaskStatus ts = TaskStatus.createTaskStatus(true,
+ new TaskAttemptID("jt", 1, true, 0, 0), 0.0f, mapSlotsPerTask,
+ state, "", "", trackers[0],
+ TaskStatus.Phase.MAP, null);
+ list.add(ts);
+ }
+
+ public void testReservedSlots() throws IOException {
+ JobConf conf = mr.createJobConf();
+
+ conf.setNumReduceTasks(1);
+ conf.setSpeculativeExecution(false);
+
+ //Set task tracker objects for reservation.
+ TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+ TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+ TaskTrackerStatus status1 = new TaskTrackerStatus(
+ trackers[0],JobInProgress.convertTrackerNameToHostName(
+ trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+ TaskTrackerStatus status2 = new TaskTrackerStatus(
+ trackers[1],JobInProgress.convertTrackerNameToHostName(
+ trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+ tt1.setStatus(status1);
+ tt2.setStatus(status2);
+
+ fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf(conf),
+ jobTracker);
+
+ sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
+ sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
+ responseId++;
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 2, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 2, metrics.getReservedReduceSlots());
+
+ // redo to test re-reservations.
+ sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
+ sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
+ responseId++;
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 4, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 4, metrics.getReservedReduceSlots());
+
+ // undo reservations now.
+ scheduler.setUnreserveSlots(true);
+ sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
+ sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
+ responseId++;
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("map slots should have been unreserved",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reduce slots should have been unreserved",
+ 0, metrics.getReservedReduceSlots());
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp?rev=1077035&r1=1077034&r2=1077035&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobtracker.jsp Fri Mar 4 03:34:31 2011
@@ -6,11 +6,13 @@
import="java.util.*"
import="java.text.DecimalFormat"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.mapreduce.*"
import="org.apache.hadoop.util.*"
%>
<%
JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
ClusterStatus status = tracker.getClusterStatus();
+ ClusterMetrics metrics = tracker.getClusterMetrics();
String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
JobQueueInfo[] queues = tracker.getQueues();
@@ -21,31 +23,38 @@
<%!
private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
- public void generateSummaryTable(JspWriter out, ClusterStatus status,
+ public void generateSummaryTable(JspWriter out, ClusterMetrics metrics,
JobTracker tracker) throws IOException {
- String tasksPerNode = status.getTaskTrackers() > 0 ?
- percentFormat.format(((double)(status.getMaxMapTasks() +
- status.getMaxReduceTasks())) / status.getTaskTrackers()):
+ String tasksPerNode = metrics.getTaskTrackerCount() > 0 ?
+ percentFormat.format(((double)(metrics.getMapSlotCapacity() +
+ metrics.getReduceSlotCapacity())) / metrics.getTaskTrackerCount()):
"-";
out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"+
- "<tr><th>Maps</th><th>Reduces</th>" +
+ "<tr><th>Running Map Tasks</th><th>Running Reduce Tasks</th>" +
"<th>Total Submissions</th>" +
- "<th>Nodes</th><th>Map Task Capacity</th>" +
+ "<th>Nodes</th>" +
+ "<th>Occupied Map Slots</th><th>Occupied Reduce Slots</th>" +
+ "<th>Reserved Map Slots</th><th>Reserved Reduce Slots</th>" +
+ "<th>Map Task Capacity</th>" +
"<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" +
"<th>Blacklisted Nodes</th>" +
"<th>Excluded Nodes</th></tr>\n");
- out.print("<tr><td>" + status.getMapTasks() + "</td><td>" +
- status.getReduceTasks() + "</td><td>" +
- tracker.getTotalSubmissions() +
+ out.print("<tr><td>" + metrics.getRunningMaps() + "</td><td>" +
+ metrics.getRunningReduces() + "</td><td>" +
+ metrics.getTotalJobSubmissions() +
"</td><td><a href=\"machines.jsp?type=active\">" +
- status.getTaskTrackers() +
- "</a></td><td>" + status.getMaxMapTasks() +
- "</td><td>" + status.getMaxReduceTasks() +
+ metrics.getTaskTrackerCount() + "</a></td><td>" +
+ metrics.getOccupiedMapSlots() + "</td><td>" +
+ metrics.getOccupiedReduceSlots() + "</td><td>" +
+ metrics.getReservedMapSlots() + "</td><td>" +
+ metrics.getReservedReduceSlots() + "</td><td>" +
+ metrics.getMapSlotCapacity() +
+ "</td><td>" + metrics.getReduceSlotCapacity() +
"</td><td>" + tasksPerNode +
"</td><td><a href=\"machines.jsp?type=blacklisted\">" +
- status.getBlacklistedTrackers() + "</a>" +
+ metrics.getBlackListedTaskTrackerCount() + "</a>" +
"</td><td><a href=\"machines.jsp?type=excluded\">" +
- status.getNumExcludedNodes() + "</a>" +
+ metrics.getDecommissionedTaskTrackerCount() + "</a>" +
"</td></tr></table>\n");
out.print("<br>");
@@ -95,7 +104,7 @@
<hr>
<h2>Cluster Summary (Heap Size is <%= StringUtils.byteDesc(status.getUsedMemory()) %>/<%= StringUtils.byteDesc(status.getMaxMemory()) %>)</h2>
<%
- generateSummaryTable(out, status, tracker);
+ generateSummaryTable(out, metrics, tracker);
%>
<hr>
<h2 id="scheduling_info">Scheduling Information</h2>