You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/10/24 08:32:51 UTC
svn commit: r829312 - in /hadoop/mapreduce/trunk: ./
src/examples/org/apache/hadoop/examples/pi/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/protocol/
src/test/mapred/org/apache/hadoop/m...
Author: sharad
Date: Sat Oct 24 06:32:51 2009
New Revision: 829312
URL: http://svn.apache.org/viewvc?rev=829312&view=rev
Log:
MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI. Contributed by Amareshwari Sriramadasu and Hemanth Yamijala.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Oct 24 06:32:51 2009
@@ -22,6 +22,10 @@
MAPREDUCE-1103. Added more metrics to Jobtracker. (sharad)
+ MAPREDUCE-1048. Add occupied/reserved slot usage summary on jobtracker UI.
+ (Amareshwari Sriramadasu and Hemanth Yamijala via sharad)
+
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java Sat Oct 24 06:32:51 2009
@@ -38,9 +38,9 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -379,14 +379,14 @@
public static class MixMachine extends Machine {
private static final MixMachine INSTANCE = new MixMachine();
- private JobClient jobclient;
+ private Cluster cluster;
/** {@inheritDoc} */
@Override
public synchronized void init(Job job) throws IOException {
final Configuration conf = job.getConfiguration();
- if (jobclient == null)
- jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+ if (cluster == null)
+ cluster = new Cluster(JobTracker.getAddress(conf), conf);
chooseMachine(conf).init(job);
}
@@ -398,9 +398,11 @@
try {
for(;; Thread.sleep(2000)) {
//get cluster status
- final ClusterStatus status = jobclient.getClusterStatus();
- final int m = status.getMaxMapTasks() - status.getMapTasks();
- final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+ final ClusterMetrics status = cluster.getClusterStatus();
+ final int m =
+ status.getMapSlotCapacity() - status.getOccupiedMapSlots();
+ final int r =
+ status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
if (m >= parts || r >= parts) {
//favor ReduceSide machine
final Machine value = r >= parts?
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Oct 24 06:32:51 2009
@@ -1692,6 +1692,7 @@
else {
jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
}
+ jobtracker.incrementReservations(type, reservedSlots);
}
public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1724,6 +1725,7 @@
jobtracker.getInstrumentation().decReservedReduceSlots(
info.getNumSlots());
}
+ jobtracker.decrementReservations(type, info.getNumSlots());
}
public int getNumReservedTaskTrackersForMaps() {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Oct 24 06:32:51 2009
@@ -20,7 +20,6 @@
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
@@ -1225,6 +1224,10 @@
//
int totalMaps = 0;
int totalReduces = 0;
+ private int occupiedMapSlots = 0;
+ private int occupiedReduceSlots = 0;
+ private int reservedMapSlots = 0;
+ private int reservedReduceSlots = 0;
private HashMap<String, TaskTracker> taskTrackers =
new HashMap<String, TaskTracker>();
Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -2434,6 +2437,8 @@
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
+ occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
+ occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -2458,6 +2463,8 @@
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
+ occupiedMapSlots += status.countOccupiedMapSlots();
+ occupiedReduceSlots += status.countOccupiedReduceSlots();
getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(status.getHost())) {
@@ -2527,6 +2534,25 @@
return oldStatus != null;
}
+ // Increment the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void incrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots += reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots += reservedSlots;
+ }
+ }
+
+ // Decrement the number of reserved slots in the cluster.
+ // This method assumes the caller has JobTracker lock.
+ void decrementReservations(TaskType type, int reservedSlots) {
+ if (type.equals(TaskType.MAP)) {
+ reservedMapSlots -= reservedSlots;
+ } else if (type.equals(TaskType.REDUCE)) {
+ reservedReduceSlots -= reservedSlots;
+ }
+ }
private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
@@ -2965,9 +2991,12 @@
}
public synchronized ClusterMetrics getClusterMetrics() {
- return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
- totalReduceTaskCapacity, taskTrackers.size() -
- getBlacklistedTrackerCount(),
+ return new ClusterMetrics(totalMaps,
+ totalReduces, occupiedMapSlots, occupiedReduceSlots,
+ reservedMapSlots, reservedReduceSlots,
+ totalMapTaskCapacity, totalReduceTaskCapacity,
+ totalSubmissions,
+ taskTrackers.size() - getBlacklistedTrackerCount(),
getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Sat Oct 24 06:32:51 2009
@@ -507,7 +507,8 @@
}
public ClusterMetrics getClusterMetrics() {
- return new ClusterMetrics(map_tasks, reduce_tasks, 1, 1, 1, 0, 0);
+ return new ClusterMetrics(map_tasks, reduce_tasks, map_tasks, reduce_tasks,
+ 0, 0, 1, 1, jobs.size(), 1, 0, 0);
}
public State getJobTrackerState() throws IOException, InterruptedException {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Sat Oct 24 06:32:51 2009
@@ -35,11 +35,17 @@
* Number of blacklisted and decommissioned trackers.
* </li>
* <li>
- * Task capacity of the cluster.
+ * Slot capacity of the cluster.
+ * </li>
+ * <li>
+ * The number of currently occupied/reserved map & reduce slots.
* </li>
* <li>
* The number of currently running map & reduce tasks.
* </li>
+ * <li>
+ * The number of job submissions.
+ * </li>
* </ol></p>
*
* <p>Clients can query for the latest <code>ClusterMetrics</code>, via
@@ -48,27 +54,59 @@
* @see Cluster
*/
public class ClusterMetrics implements Writable {
- int runningMaps;
- int runningReduces;
- int mapSlots;
- int reduceSlots;
- int numTrackers;
- int numBlacklistedTrackers;
- int numDecommissionedTrackers;
+ private int runningMaps;
+ private int runningReduces;
+ private int occupiedMapSlots;
+ private int occupiedReduceSlots;
+ private int reservedMapSlots;
+ private int reservedReduceSlots;
+ private int totalMapSlots;
+ private int totalReduceSlots;
+ private int totalJobSubmissions;
+ private int numTrackers;
+ private int numBlacklistedTrackers;
+ private int numDecommissionedTrackers;
public ClusterMetrics() {
}
- public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots,
- int reduceSlots, int numTrackers, int numBlacklistedTrackers,
- int numDecommisionedNodes) {
+ public ClusterMetrics(int runningMaps, int runningReduces,
+ int occupiedMapSlots, int occupiedReduceSlots,
+ int reservedMapSlots, int reservedReduceSlots,
+ int mapSlots, int reduceSlots,
+ int totalJobSubmissions,
+ int numTrackers, int numBlacklistedTrackers,
+ int numDecommissionedNodes) {
this.runningMaps = runningMaps;
this.runningReduces = runningReduces;
- this.mapSlots = mapSlots;
- this.reduceSlots = reduceSlots;
+ this.occupiedMapSlots = occupiedMapSlots;
+ this.occupiedReduceSlots = occupiedReduceSlots;
+ this.reservedMapSlots = reservedMapSlots;
+ this.reservedReduceSlots = reservedReduceSlots;
+ this.totalMapSlots = mapSlots;
+ this.totalReduceSlots = reduceSlots;
+ this.totalJobSubmissions = totalJobSubmissions;
this.numTrackers = numTrackers;
this.numBlacklistedTrackers = numBlacklistedTrackers;
- this.numDecommissionedTrackers = numDecommisionedNodes;
+ this.numDecommissionedTrackers = numDecommissionedNodes;
+ }
+
+ /**
+ * Get the number of running map tasks in the cluster.
+ *
+ * @return running maps
+ */
+ public int getRunningMaps() {
+ return runningMaps;
+ }
+
+ /**
+ * Get the number of running reduce tasks in the cluster.
+ *
+ * @return running reduces
+ */
+ public int getRunningReduces() {
+ return runningReduces;
}
/**
@@ -77,7 +115,7 @@
* @return occupied map slot count
*/
public int getOccupiedMapSlots() {
- return runningMaps;
+ return occupiedMapSlots;
}
/**
@@ -86,16 +124,34 @@
* @return occupied reduce slot count
*/
public int getOccupiedReduceSlots() {
- return runningReduces;
+ return occupiedReduceSlots;
+ }
+
+ /**
+ * Get number of reserved map slots in the cluster.
+ *
+ * @return reserved map slot count
+ */
+ public int getReservedMapSlots() {
+ return reservedMapSlots;
}
/**
+ * Get the number of reserved reduce slots in the cluster.
+ *
+ * @return reserved reduce slot count
+ */
+ public int getReservedReduceSlots() {
+ return reservedReduceSlots;
+ }
+
+ /**
* Get the total number of map slots in the cluster.
*
* @return map slot capacity
*/
public int getMapSlotCapacity() {
- return mapSlots;
+ return totalMapSlots;
}
/**
@@ -104,7 +160,16 @@
* @return reduce slot capacity
*/
public int getReduceSlotCapacity() {
- return reduceSlots;
+ return totalReduceSlots;
+ }
+
+ /**
+ * Get the total number of job submissions in the cluster.
+ *
+ * @return total number of job submissions
+ */
+ public int getTotalJobSubmissions() {
+ return totalJobSubmissions;
}
/**
@@ -138,8 +203,13 @@
public void readFields(DataInput in) throws IOException {
runningMaps = in.readInt();
runningReduces = in.readInt();
- mapSlots = in.readInt();
- reduceSlots = in.readInt();
+ occupiedMapSlots = in.readInt();
+ occupiedReduceSlots = in.readInt();
+ reservedMapSlots = in.readInt();
+ reservedReduceSlots = in.readInt();
+ totalMapSlots = in.readInt();
+ totalReduceSlots = in.readInt();
+ totalJobSubmissions = in.readInt();
numTrackers = in.readInt();
numBlacklistedTrackers = in.readInt();
numDecommissionedTrackers = in.readInt();
@@ -149,8 +219,13 @@
public void write(DataOutput out) throws IOException {
out.writeInt(runningMaps);
out.writeInt(runningReduces);
- out.writeInt(mapSlots);
- out.writeInt(reduceSlots);
+ out.writeInt(occupiedMapSlots);
+ out.writeInt(occupiedReduceSlots);
+ out.writeInt(reservedMapSlots);
+ out.writeInt(reservedReduceSlots);
+ out.writeInt(totalMapSlots);
+ out.writeInt(totalReduceSlots);
+ out.writeInt(totalJobSubmissions);
out.writeInt(numTrackers);
out.writeInt(numBlacklistedTrackers);
out.writeInt(numDecommissionedTrackers);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Sat Oct 24 06:32:51 2009
@@ -85,8 +85,10 @@
* Version 27: Changed protocol to use new api objects. And the protocol is
* renamed from JobSubmissionProtocol to ClientProtocol.
* Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
+ * Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
+ * to ClusterMetrics as part of MAPREDUCE-1048.
*/
- public static final long versionID = 28L;
+ public static final long versionID = 29L;
/**
* Allocate a name for the job.
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java?rev=829312&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java Sat Oct 24 06:32:51 2009
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestClusterStatus extends TestCase {
+
+ private static String[] trackers = new String[] { "tracker_tracker1:1000",
+ "tracker_tracker2:1000", "tracker_tracker3:1000" };
+ private static JobTracker jobTracker;
+ private static int mapSlotsPerTracker = 4;
+ private static int reduceSlotsPerTracker = 2;
+ private static MiniMRCluster mr;
+ private static FakeJobInProgress fakeJob = null;
+ private static Cluster cluster;
+ // heartbeat responseId. increment this after sending a heartbeat
+ private static short responseId = 1;
+
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+ protected void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass(JTConfig.JT_TASK_SCHEDULER, FakeTaskScheduler.class,
+ TaskScheduler.class);
+ mr = new MiniMRCluster(0, "file:///", 1, null, null, new JobConf(conf));
+ jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ for (String tracker : trackers) {
+ FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+ }
+ cluster = new Cluster(mr.createJobConf());
+ }
+
+ protected void tearDown() throws Exception {
+ cluster.close();
+ mr.shutdown();
+ }
+ };
+ return setup;
+ }
+
+ /**
+ * Fake scheduler to test reservations.
+ *
+ * The reservations are updated incrementally in each
+ * heartbeat to pass through the re-reservation logic.
+ */
+ static class FakeTaskScheduler extends JobQueueTaskScheduler {
+
+ private Map<TaskTracker, Integer> reservedCounts
+ = new HashMap<TaskTracker, Integer>();
+
+ public FakeTaskScheduler() {
+ super();
+ }
+
+ public List<Task> assignTasks(TaskTracker tt) {
+ int currCount = 1;
+ if (reservedCounts.containsKey(tt)) {
+ currCount = reservedCounts.get(tt) + 1;
+ }
+ reservedCounts.put(tt, currCount);
+ tt.reserveSlots(TaskType.MAP, fakeJob, currCount);
+ tt.reserveSlots(TaskType.REDUCE, fakeJob, currCount);
+ return new ArrayList<Task>();
+ }
+ }
+
+ private TaskTrackerStatus getTTStatus(String trackerName,
+ List<TaskStatus> taskStatuses) {
+ return new TaskTrackerStatus(trackerName,
+ JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+ taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+ }
+
+ public void testClusterMetrics() throws IOException, InterruptedException {
+ assertEquals("tasktracker count doesn't match", trackers.length,
+ cluster.getClusterStatus().getTaskTrackerCount());
+
+ List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+ // create a map task status, which uses 2 slots.
+ int mapSlotsPerTask = 2;
+ addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
+
+ // create a reduce task status, which uses 1 slot.
+ int reduceSlotsPerTask = 1;
+ addReduceTaskAttemptToList(list,
+ reduceSlotsPerTask, TaskStatus.State.RUNNING);
+
+ // create TaskTrackerStatus and send heartbeats
+ sendHeartbeats(list);
+
+ // assert ClusterMetrics
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ assertEquals("occupied map slots do not match", mapSlotsPerTask,
+ metrics.getOccupiedMapSlots());
+ assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+ metrics.getOccupiedReduceSlots());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ metrics.getMapSlotCapacity());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ metrics.getReduceSlotCapacity());
+ assertEquals("running map tasks do not match", 1,
+ metrics.getRunningMaps());
+ assertEquals("running reduce tasks do not match", 1,
+ metrics.getRunningReduces());
+
+ // assert the values in ClusterStatus also
+ assertEquals("running map tasks do not match", 1,
+ jobTracker.getClusterStatus().getMapTasks());
+ assertEquals("running reduce tasks do not match", 1,
+ jobTracker.getClusterStatus().getReduceTasks());
+ assertEquals("map slot capacities do not match",
+ mapSlotsPerTracker * trackers.length,
+ jobTracker.getClusterStatus().getMaxMapTasks());
+ assertEquals("reduce slot capacities do not match",
+ reduceSlotsPerTracker * trackers.length,
+ jobTracker.getClusterStatus().getMaxReduceTasks());
+
+ // send a heartbeat finishing only a map and check
+ // counts are updated.
+ list.clear();
+ addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
+ addReduceTaskAttemptToList(list,
+ reduceSlotsPerTask, TaskStatus.State.RUNNING);
+ sendHeartbeats(list);
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals(0, metrics.getOccupiedMapSlots());
+ assertEquals(reduceSlotsPerTask, metrics.getOccupiedReduceSlots());
+
+ // send a heartbeat finishing the reduce task also.
+ list.clear();
+ addReduceTaskAttemptToList(list,
+ reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
+ sendHeartbeats(list);
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals(0, metrics.getOccupiedReduceSlots());
+ }
+
+ private void sendHeartbeats(List<TaskStatus> list) throws IOException {
+ TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+ status[0] = getTTStatus(trackers[0], list);
+ status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+ status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+ for (int i = 0; i< trackers.length; i++) {
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false, false,
+ trackers[i], responseId);
+ }
+ responseId++;
+ }
+
+ private void addReduceTaskAttemptToList(List<TaskStatus> list,
+ int reduceSlotsPerTask, TaskStatus.State state) {
+ TaskStatus ts = TaskStatus.createTaskStatus(false,
+ new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+ reduceSlotsPerTask,
+ state, "", "", trackers[0],
+ TaskStatus.Phase.REDUCE, null);
+ list.add(ts);
+ }
+
+ private void addMapTaskAttemptToList(List<TaskStatus> list,
+ int mapSlotsPerTask, TaskStatus.State state) {
+ TaskStatus ts = TaskStatus.createTaskStatus(true,
+ new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
+ state, "", "", trackers[0],
+ TaskStatus.Phase.MAP, null);
+ list.add(ts);
+ }
+
+ public void testReservedSlots() throws Exception {
+ Configuration conf = mr.createJobConf();
+ conf.setInt(JobContext.NUM_MAPS, 1);
+
+ Job job = Job.getInstance(cluster, conf);
+ job.setNumReduceTasks(1);
+ job.setSpeculativeExecution(false);
+ job.setJobSetupCleanupNeeded(false);
+
+ //Set task tracker objects for reservation.
+ TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+ TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+ TaskTrackerStatus status1 = new TaskTrackerStatus(
+ trackers[0],JobInProgress.convertTrackerNameToHostName(
+ trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+ TaskTrackerStatus status2 = new TaskTrackerStatus(
+ trackers[1],JobInProgress.convertTrackerNameToHostName(
+ trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+ tt1.setStatus(status1);
+ tt2.setStatus(status2);
+
+ fakeJob = new FakeJobInProgress(new JobConf(job.getConfiguration()),
+ jobTracker);
+ fakeJob.setClusterSize(3);
+ fakeJob.initTasks();
+
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
+ true, trackers[0], responseId);
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
+ true, trackers[1], responseId);
+ responseId++;
+ ClusterMetrics metrics = cluster.getClusterStatus();
+ assertEquals("reserved map slots do not match",
+ 2, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 2, metrics.getReservedReduceSlots());
+
+ // redo to test re-reservations.
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status1, false,
+ true, trackers[0], responseId);
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status2, false,
+ true, trackers[1], responseId);
+ responseId++;
+ metrics = cluster.getClusterStatus();
+ assertEquals("reserved map slots do not match",
+ 4, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 4, metrics.getReservedReduceSlots());
+
+ TaskAttemptID mTid = fakeJob.findMapTask(trackers[1]);
+ TaskAttemptID rTid = fakeJob.findReduceTask(trackers[1]);
+
+ fakeJob.finishTask(mTid);
+ fakeJob.finishTask(rTid);
+
+ assertEquals("Job didnt complete successfully complete",
+ fakeJob.getStatus().getRunState(), JobStatus.SUCCEEDED);
+ metrics = cluster.getClusterStatus();
+ assertEquals("reserved map slots do not match",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 0, metrics.getReservedReduceSlots());
+ }
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java Sat Oct 24 06:32:51 2009
@@ -23,6 +23,7 @@
import javax.security.auth.login.LoginException;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -124,6 +125,11 @@
2, fjob.getNumReservedTaskTrackersForMaps());
assertEquals("Trackers not reserved for the job : reduces",
2, fjob.getNumReservedTaskTrackersForReduces());
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 4, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 4, metrics.getReservedReduceSlots());
TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);
@@ -138,6 +144,11 @@
0, fjob.getNumReservedTaskTrackersForMaps());
assertEquals("Reservation for the job not released : Reduces",
0, fjob.getNumReservedTaskTrackersForReduces());
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 0, metrics.getReservedReduceSlots());
}
/**
@@ -165,6 +176,11 @@
0, job.getNumReservedTaskTrackersForMaps());
assertEquals("Reservation for the job not released : Reduces",
0, job.getNumReservedTaskTrackersForReduces());
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 0, metrics.getReservedReduceSlots());
}
/**
@@ -212,6 +228,11 @@
2, job.getNumReservedTaskTrackersForMaps());
assertEquals("Trackers not reserved for the job : reduces",
2, job.getNumReservedTaskTrackersForReduces());
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 4, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 4, metrics.getReservedReduceSlots());
/*
* FakeJobInProgress.findMapTask does not handle
@@ -230,6 +251,12 @@
1, job.getNumReservedTaskTrackersForMaps());
assertEquals("Extra Trackers reserved for the job : reduces",
1, job.getNumReservedTaskTrackersForReduces());
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 2, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 2, metrics.getReservedReduceSlots());
+
//Finish the map task on the tracker 1. Finishing it here to work
//around bug in the FakeJobInProgress object
job.finishTask(mTid);
@@ -245,7 +272,11 @@
0, job.getNumReservedTaskTrackersForMaps());
assertEquals("Trackers not unreserved for the job : reduces",
0, job.getNumReservedTaskTrackersForReduces());
-
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 0, metrics.getReservedReduceSlots());
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp?rev=829312&r1=829311&r2=829312&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp Sat Oct 24 06:32:51 2009
@@ -25,6 +25,7 @@
import="java.util.*"
import="java.text.DecimalFormat"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.mapreduce.*"
import="org.apache.hadoop.util.*"
%>
<%! private static final long serialVersionUID = 1L;
@@ -32,6 +33,7 @@
<%
JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
ClusterStatus status = tracker.getClusterStatus();
+ ClusterMetrics metrics = tracker.getClusterMetrics();
String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
JobQueueInfo[] queues = tracker.getJobQueues();
@@ -42,34 +44,41 @@
<%!
private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
- public void generateSummaryTable(JspWriter out, ClusterStatus status,
+ public void generateSummaryTable(JspWriter out, ClusterMetrics metrics,
JobTracker tracker) throws IOException {
- String tasksPerNode = status.getTaskTrackers() > 0 ?
- percentFormat.format(((double)(status.getMaxMapTasks() +
- status.getMaxReduceTasks())) / status.getTaskTrackers()):
+ String tasksPerNode = metrics.getTaskTrackerCount() > 0 ?
+ percentFormat.format(((double)(metrics.getMapSlotCapacity() +
+ metrics.getReduceSlotCapacity())) / metrics.getTaskTrackerCount()):
"-";
out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"+
"<tr><th>Queues</th>" +
- "<th>Maps</th><th>Reduces</th>" +
+ "<th>Running Map Tasks</th><th>Running Reduce Tasks</th>" +
"<th>Total Submissions</th>" +
- "<th>Nodes</th><th>Map Task Capacity</th>" +
- "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" +
+ "<th>Nodes</th>" +
+ "<th>Occupied Map Slots</th><th>Occupied Reduce Slots</th>" +
+ "<th>Reserved Map Slots</th><th>Reserved Reduce Slots</th>" +
+ "<th>Map Slot Capacity</th>" +
+ "<th>Reduce Slot Capacity</th><th>Avg. Slots/Node</th>" +
"<th>Blacklisted Nodes</th>" +
"<th>Excluded Nodes</th></tr>\n");
out.print("<tr><td><a href=\"queueinfo.jsp\">" +
tracker.getRootQueues().length + "</a></td><td>" +
- status.getMapTasks() + "</td><td>" +
- status.getReduceTasks() + "</td><td>" +
- tracker.getTotalSubmissions() +
+ metrics.getRunningMaps() + "</td><td>" +
+ metrics.getRunningReduces() + "</td><td>" +
+ metrics.getTotalJobSubmissions() +
"</td><td><a href=\"machines.jsp?type=active\">" +
- status.getTaskTrackers() +
- "</a></td><td>" + status.getMaxMapTasks() +
- "</td><td>" + status.getMaxReduceTasks() +
+ metrics.getTaskTrackerCount() + "</a></td><td>" +
+ metrics.getOccupiedMapSlots() + "</td><td>" +
+ metrics.getOccupiedReduceSlots() + "</td><td>" +
+ metrics.getReservedMapSlots() + "</td><td>" +
+ metrics.getReservedReduceSlots() + "</td><td>" +
+ + metrics.getMapSlotCapacity() +
+ "</td><td>" + metrics.getReduceSlotCapacity() +
"</td><td>" + tasksPerNode +
"</td><td><a href=\"machines.jsp?type=blacklisted\">" +
- status.getBlacklistedTrackers() + "</a>" +
+ metrics.getBlackListedTaskTrackerCount() + "</a>" +
"</td><td><a href=\"machines.jsp?type=excluded\">" +
- status.getNumExcludedNodes() + "</a>" +
+ metrics.getDecommissionedTaskTrackerCount() + "</a>" +
"</td></tr></table>\n");
out.print("<br>");
@@ -120,7 +129,7 @@
<hr>
<h2>Cluster Summary (Heap Size is <%= StringUtils.byteDesc(status.getUsedMemory()) %>/<%= StringUtils.byteDesc(status.getMaxMemory()) %>)</h2>
<%
- generateSummaryTable(out, status, tracker);
+ generateSummaryTable(out, metrics, tracker);
%>
<hr>
<b>Filter (Jobid, Priority, User, Name)</b> <input type="text" id="filter" onkeyup="applyfilter()"> <br>