You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/10/15 13:56:59 UTC
svn commit: r825469 - in /hadoop/mapreduce/trunk: ./
src/examples/org/apache/hadoop/examples/pi/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/protocol/
src/java/org/apache/hadoop/mapreduc...
Author: sharad
Date: Thu Oct 15 11:56:57 2009
New Revision: 825469
URL: http://svn.apache.org/viewvc?rev=825469&view=rev
Log:
MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Oct 15 11:56:57 2009
@@ -10,6 +10,9 @@
MAPREDUCE-999. Improve Sqoop test speed and refactor tests.
(Aaron Kimball via tomwhite)
+
+ MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI.
+ (Amareshwari Sriramadasu via sharad)
OPTIMIZATIONS
Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java Thu Oct 15 11:56:57 2009
@@ -38,9 +38,9 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -379,14 +379,14 @@
public static class MixMachine extends Machine {
private static final MixMachine INSTANCE = new MixMachine();
- private JobClient jobclient;
+ private Cluster cluster;
/** {@inheritDoc} */
@Override
public synchronized void init(Job job) throws IOException {
final Configuration conf = job.getConfiguration();
- if (jobclient == null)
- jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+ if (cluster == null)
+ cluster = new Cluster(JobTracker.getAddress(conf), conf);
chooseMachine(conf).init(job);
}
@@ -398,9 +398,11 @@
try {
for(;; Thread.sleep(2000)) {
//get cluster status
- final ClusterStatus status = jobclient.getClusterStatus();
- final int m = status.getMaxMapTasks() - status.getMapTasks();
- final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+ final ClusterMetrics status = cluster.getClusterStatus();
+ final int m =
+ status.getMapSlotCapacity() - status.getOccupiedMapSlots();
+ final int r =
+ status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
if (m >= parts || r >= parts) {
//favor ReduceSide machine
final Machine value = r >= parts?
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Oct 15 11:56:57 2009
@@ -1703,6 +1703,14 @@
return trackersReservedForReduces.size();
}
+ public int getReservedMapSlots(TaskTracker taskTracker) {
+ return trackersReservedForMaps.get(taskTracker).getNumSlots();
+ }
+
+ public int getReservedReduceSlots(TaskTracker taskTracker) {
+ return trackersReservedForReduces.get(taskTracker).getNumSlots();
+ }
+
private int getTrackerTaskFailures(String trackerName) {
String trackerHostName = convertTrackerNameToHostName(trackerName);
Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Oct 15 11:56:57 2009
@@ -20,7 +20,6 @@
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
@@ -1215,6 +1214,10 @@
//
int totalMaps = 0;
int totalReduces = 0;
+ private int occupiedMapSlots = 0;
+ private int occupiedReduceSlots = 0;
+ private int reservedMapSlots = 0;
+ private int reservedReduceSlots = 0;
private HashMap<String, TaskTracker> taskTrackers =
new HashMap<String, TaskTracker>();
Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -2314,6 +2317,7 @@
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
+ removeTrackerReservations(getTaskTracker(trackerName));
// Check for new tasks to be executed on the tasktracker
if (acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
@@ -2333,6 +2337,7 @@
}
}
}
+ addTrackerReservations(getTaskTracker(trackerName));
// Check for tasks to be killed
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
@@ -2423,6 +2428,8 @@
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
+ occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+ occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
int mapSlots = oldStatus.getMaxMapSlots();
totalMapTaskCapacity -= mapSlots;
@@ -2445,6 +2452,8 @@
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
+ occupiedMapSlots += status.countOccupiedMapSlots();
+ occupiedReduceSlots += status.countOccupiedReduceSlots();
if (!faultyTrackers.isBlacklisted(status.getHost())) {
int mapSlots = status.getMaxMapSlots();
totalMapTaskCapacity += mapSlots;
@@ -2513,6 +2522,18 @@
}
+ // remove the tracker reservations from statistics
+ private void removeTrackerReservations(TaskTracker tt) {
+ reservedMapSlots -= tt.getReservedMapSlots();
+ reservedReduceSlots -= tt.getReservedReduceSlots();
+ }
+
+ // add the tracker reservations to statistics
+ private void addTrackerReservations(TaskTracker tt) {
+ reservedMapSlots += tt.getReservedMapSlots();
+ reservedReduceSlots += tt.getReservedReduceSlots();
+ }
+
private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
synchronized (faultyTrackers) {
@@ -2559,9 +2580,10 @@
}
}
}
-
+ removeTrackerReservations(getTaskTracker(trackerName));
updateTaskStatuses(trackerStatus);
updateNodeHealthStatus(trackerStatus);
+ addTrackerReservations(getTaskTracker(trackerName));
return true;
}
@@ -2950,8 +2972,9 @@
}
public synchronized ClusterMetrics getClusterMetrics() {
- return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
- totalReduceTaskCapacity, taskTrackers.size() -
+ return new ClusterMetrics(occupiedMapSlots, occupiedReduceSlots,
+ reservedMapSlots, reservedReduceSlots,
+ totalMapTaskCapacity, totalReduceTaskCapacity, taskTrackers.size() -
getBlacklistedTrackerCount(),
getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
}
@@ -3724,6 +3747,7 @@
// Cleanup
taskTracker.cancelAllReservations();
+ removeTrackerReservations(taskTracker);
// Purge 'marked' tasks, needs to be done
// here to prevent hanging references!
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Oct 15 11:56:57 2009
@@ -506,7 +506,7 @@
}
public ClusterMetrics getClusterMetrics() {
- return new ClusterMetrics(map_tasks, reduce_tasks, 1, 1, 1, 0, 0);
+ return new ClusterMetrics(map_tasks, reduce_tasks, 0, 0, 1, 1, 1, 0, 0);
}
public State getJobTrackerState() throws IOException, InterruptedException {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Thu Oct 15 11:56:57 2009
@@ -35,10 +35,10 @@
* Number of blacklisted and decommissioned trackers.
* </li>
* <li>
- * Task capacity of the cluster.
+ * Slot capacity of the cluster.
* </li>
* <li>
- * The number of currently running map & reduce tasks.
+ * The number of currently occupied/reserved map & reduce slots.
* </li>
* </ol></p>
*
@@ -48,24 +48,30 @@
* @see Cluster
*/
public class ClusterMetrics implements Writable {
- int runningMaps;
- int runningReduces;
- int mapSlots;
- int reduceSlots;
- int numTrackers;
- int numBlacklistedTrackers;
- int numDecommissionedTrackers;
+ private int occupiedMapSlots;
+ private int occupiedReduceSlots;
+ private int reservedMapSlots;
+ private int reservedReduceSlots;
+ private int totalMapSlots;
+ private int totalReduceSlots;
+ private int numTrackers;
+ private int numBlacklistedTrackers;
+ private int numDecommissionedTrackers;
public ClusterMetrics() {
}
- public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots,
- int reduceSlots, int numTrackers, int numBlacklistedTrackers,
- int numDecommisionedNodes) {
- this.runningMaps = runningMaps;
- this.runningReduces = runningReduces;
- this.mapSlots = mapSlots;
- this.reduceSlots = reduceSlots;
+ public ClusterMetrics(int occupiedMapSlots, int occupiedReduceSlots,
+ int reservedMapSlots, int reservedReduceSlots,
+ int mapSlots, int reduceSlots,
+ int numTrackers, int numBlacklistedTrackers,
+ int numDecommisionedNodes) {
+ this.occupiedMapSlots = occupiedMapSlots;
+ this.occupiedReduceSlots = occupiedReduceSlots;
+ this.reservedMapSlots = reservedMapSlots;
+ this.reservedReduceSlots = reservedReduceSlots;
+ this.totalMapSlots = mapSlots;
+ this.totalReduceSlots = reduceSlots;
this.numTrackers = numTrackers;
this.numBlacklistedTrackers = numBlacklistedTrackers;
this.numDecommissionedTrackers = numDecommisionedNodes;
@@ -77,7 +83,7 @@
* @return occupied map slot count
*/
public int getOccupiedMapSlots() {
- return runningMaps;
+ return occupiedMapSlots;
}
/**
@@ -86,16 +92,34 @@
* @return occupied reduce slot count
*/
public int getOccupiedReduceSlots() {
- return runningReduces;
+ return occupiedReduceSlots;
+ }
+
+ /**
+ * Get number of reserved map slots in the cluster.
+ *
+ * @return reserved map slot count
+ */
+ public int getReservedMapSlots() {
+ return reservedMapSlots;
}
/**
+ * Get the number of reserved reduce slots in the cluster.
+ *
+ * @return reserved reduce slot count
+ */
+ public int getReservedReduceSlots() {
+ return reservedReduceSlots;
+ }
+
+ /**
* Get the total number of map slots in the cluster.
*
* @return map slot capacity
*/
public int getMapSlotCapacity() {
- return mapSlots;
+ return totalMapSlots;
}
/**
@@ -104,7 +128,7 @@
* @return reduce slot capacity
*/
public int getReduceSlotCapacity() {
- return reduceSlots;
+ return totalReduceSlots;
}
/**
@@ -136,10 +160,12 @@
@Override
public void readFields(DataInput in) throws IOException {
- runningMaps = in.readInt();
- runningReduces = in.readInt();
- mapSlots = in.readInt();
- reduceSlots = in.readInt();
+ occupiedMapSlots = in.readInt();
+ occupiedReduceSlots = in.readInt();
+ reservedMapSlots = in.readInt();
+ reservedReduceSlots = in.readInt();
+ totalMapSlots = in.readInt();
+ totalReduceSlots = in.readInt();
numTrackers = in.readInt();
numBlacklistedTrackers = in.readInt();
numDecommissionedTrackers = in.readInt();
@@ -147,10 +173,12 @@
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(runningMaps);
- out.writeInt(runningReduces);
- out.writeInt(mapSlots);
- out.writeInt(reduceSlots);
+ out.writeInt(occupiedMapSlots);
+ out.writeInt(occupiedReduceSlots);
+ out.writeInt(reservedMapSlots);
+ out.writeInt(reservedReduceSlots);
+ out.writeInt(totalMapSlots);
+ out.writeInt(totalReduceSlots);
out.writeInt(numTrackers);
out.writeInt(numBlacklistedTrackers);
out.writeInt(numDecommissionedTrackers);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Thu Oct 15 11:56:57 2009
@@ -85,8 +85,9 @@
* Version 27: Changed protocol to use new api objects. And the protocol is
* renamed from JobSubmissionProtocol to ClientProtocol.
* Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
+ * Version 29: Added reservedSlots to ClusterMetrics.
*/
- public static final long versionID = 28L;
+ public static final long versionID = 29L;
/**
* Allocate a name for the job.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java Thu Oct 15 11:56:57 2009
@@ -100,6 +100,30 @@
}
/**
+ * Get the reserved map slots for the tracker.
+ *
+ * @return reserved map slots
+ */
+ public int getReservedMapSlots() {
+ if (jobForFallowMapSlot != null) {
+ return jobForFallowMapSlot.getReservedMapSlots(this);
+ }
+ return 0;
+ }
+
+ /**
+ * Get the reserved reduce slots for the tracker.
+ *
+ * @return reserved reduce slots
+ */
+ public int getReservedReduceSlots() {
+ if (jobForFallowReduceSlot != null) {
+ return jobForFallowReduceSlot.getReservedReduceSlots(this);
+ }
+ return 0;
+ }
+
+ /**
* Get the {@link JobInProgress} for which the fallow slot(s) are held.
* @param taskType {@link TaskType} of the task
* @return the task for which the fallow slot(s) are held,
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Thu Oct 15 11:56:57 2009
@@ -223,7 +223,7 @@
}
static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status,
- boolean initialContact,
+ boolean initialContact, boolean acceptNewTasks,
String tracker, short responseId)
throws IOException {
if (status == null) {
@@ -231,13 +231,13 @@
JobInProgress.convertTrackerNameToHostName(tracker));
}
- jt.heartbeat(status, false, initialContact, false, responseId);
+ jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
return ++responseId ;
}
static void establishFirstContact(JobTracker jt, String tracker)
throws IOException {
- sendHeartBeat(jt, null, true, tracker, (short) 0);
+ sendHeartBeat(jt, null, true, false, tracker, (short) 0);
}
static class FakeTaskInProgress extends TaskInProgress {
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java?rev=825469&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java Thu Oct 15 11:56:57 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestClusterStatus extends TestCase {
+
+ private static String[] trackers = new String[] { "tracker_tracker1:1000",
+ "tracker_tracker2:1000", "tracker_tracker3:1000" };
+ private static JobTracker jobTracker;
+ private static int mapSlotsPerTracker = 4;
+ private static int reduceSlotsPerTracker = 2;
+ private static MiniMRCluster mr;
+ private static FakeJobInProgress fakeJob = null;
+ private static Cluster cluster;
+ // heartbeat responseId. increment this after sending a heartbeat
+ private static short responseId = 1;
+
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+ protected void setUp() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.setClass(JTConfig.JT_TASK_SCHEDULER, FakeTaskScheduler.class,
+ TaskScheduler.class);
+ mr = new MiniMRCluster(0, "file:///", 1, null, null, new JobConf(conf));
+ jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ for (String tracker : trackers) {
+ FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+ }
+ cluster = new Cluster(mr.createJobConf());
+ }
+
+ protected void tearDown() throws Exception {
+ cluster.close();
+ mr.shutdown();
+ }
+ };
+ return setup;
+ }
+
+ static class FakeTaskScheduler extends JobQueueTaskScheduler {
+ public FakeTaskScheduler() {
+ super();
+ }
+ public List<Task> assignTasks(TaskTracker tt) {
+ tt.reserveSlots(TaskType.MAP, fakeJob, 2);
+ tt.reserveSlots(TaskType.REDUCE, fakeJob, 2);
+ return new ArrayList<Task>();
+}
+ }
+
+ private TaskTrackerStatus getTTStatus(String trackerName,
+ List<TaskStatus> taskStatuses) {
+ return new TaskTrackerStatus(trackerName,
+ JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+ taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+ }
+
+ public void testClusterMetrics() throws IOException, InterruptedException {
+ assertEquals("tasktracker count doesn't match", trackers.length,
+ cluster.getClusterStatus().getTaskTrackerCount());
+
+ List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+ // create a map task status, which uses 2 slots.
+ int mapSlotsPerTask = 2;
+ TaskStatus ts = TaskStatus.createTaskStatus(true,
+ new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
+ TaskStatus.State.RUNNING, "", "", trackers[0],
+ TaskStatus.Phase.MAP, null);
+ list.add(ts);
+
+ // create a reduce task status, which uses 1 slot.
+ int reduceSlotsPerTask = 1;
+ ts = TaskStatus.createTaskStatus(false,
+ new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+ reduceSlotsPerTask,
+ TaskStatus.State.RUNNING, "", "", trackers[0],
+ TaskStatus.Phase.REDUCE, null);
+ list.add(ts);
+
+ // create TaskTrackerStatus and send heartbeats
+ TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+ status[0] = getTTStatus(trackers[0], list);
+ status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+ status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+ for (int i = 0; i< trackers.length; i++) {
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false, false,
+ trackers[i], responseId);
+ }
+ responseId++;
+ // assert ClusterMetrics
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ assertEquals("occupied map slots do not match", mapSlotsPerTask,
+ metrics.getOccupiedMapSlots());
+ assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+ metrics.getOccupiedReduceSlots());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ metrics.getMapSlotCapacity());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ metrics.getReduceSlotCapacity());
+
+ // assert the values in ClusterStatus also
+ assertEquals("running map tasks do not match", 1,
+ jobTracker.getClusterStatus().getMapTasks());
+ assertEquals("running reduce tasks do not match", 1,
+ jobTracker.getClusterStatus().getReduceTasks());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ jobTracker.getClusterStatus().getMaxMapTasks());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ jobTracker.getClusterStatus().getMaxReduceTasks());
+ cluster.close();
+ }
+
+ public void testReservedSlots() throws Exception {
+ Configuration conf = mr.createJobConf();
+ conf.setInt(JobContext.NUM_MAPS, 1);
+
+ Job job = Job.getInstance(cluster, conf);
+ job.setNumReduceTasks(1);
+ job.setSpeculativeExecution(false);
+ job.setJobSetupCleanupNeeded(false);
+
+ //Set task tracker objects for reservation.
+ TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+ TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+ TaskTrackerStatus status1 = new TaskTrackerStatus(
+ trackers[0],JobInProgress.convertTrackerNameToHostName(
+ trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+ TaskTrackerStatus status2 = new TaskTrackerStatus(
+ trackers[1],JobInProgress.convertTrackerNameToHostName(
+ trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+ tt1.setStatus(status1);
+ tt2.setStatus(status2);
+
+ fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()),
+ jobTracker);
+ fakeJob.setClusterSize(3);
+ fakeJob.initTasks();
+
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
+ true, trackers[0], responseId);
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
+ true, trackers[1], responseId);
+ responseId++;
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ assertEquals("reserved map slots do not match",
+ 4, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 4, metrics.getReservedReduceSlots());
+
+ TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]);
+ TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]);
+
+ fakeJob.finishTask(mTid);
+ fakeJob.finishTask(rTid);
+
+ assertEquals("Job didnt complete successfully complete",
+ fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED);
+ }
+}
Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp?rev=825469&r1=825468&r2=825469&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp Thu Oct 15 11:56:57 2009
@@ -25,6 +25,7 @@
import="java.util.*"
import="java.text.DecimalFormat"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.mapreduce.*"
import="org.apache.hadoop.util.*"
%>
<%! private static final long serialVersionUID = 1L;
@@ -44,22 +45,29 @@
public void generateSummaryTable(JspWriter out, ClusterStatus status,
JobTracker tracker) throws IOException {
+ ClusterMetrics metrics = tracker.getClusterMetrics();
String tasksPerNode = status.getTaskTrackers() > 0 ?
percentFormat.format(((double)(status.getMaxMapTasks() +
status.getMaxReduceTasks())) / status.getTaskTrackers()):
"-";
out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"+
"<tr><th>Queues</th>" +
- "<th>Maps</th><th>Reduces</th>" +
+ "<th>Running Map Tasks</th><th>Running Reduce Tasks</th>" +
+ "<th>Occupied Map Slots</th><th>Occupied Reduce Slots</th>" +
+ "<th>Reserved Map Slots</th><th>Reserved Reduce Slots</th>" +
"<th>Total Submissions</th>" +
- "<th>Nodes</th><th>Map Task Capacity</th>" +
- "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" +
+ "<th>Nodes</th><th>Map Slot Capacity</th>" +
+ "<th>Reduce Slot Capacity</th><th>Avg. Slots/Node</th>" +
"<th>Blacklisted Nodes</th>" +
"<th>Excluded Nodes</th></tr>\n");
out.print("<tr><td><a href=\"queueinfo.jsp\">" +
tracker.getRootQueues().length + "</a></td><td>" +
status.getMapTasks() + "</td><td>" +
status.getReduceTasks() + "</td><td>" +
+ metrics.getOccupiedMapSlots() + "</td><td>" +
+ metrics.getOccupiedReduceSlots() + "</td><td>" +
+ metrics.getReservedMapSlots() + "</td><td>" +
+ metrics.getReservedReduceSlots() + "</td><td>" +
tracker.getTotalSubmissions() +
"</td><td><a href=\"machines.jsp?type=active\">" +
status.getTaskTrackers() +