You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:11 UTC
svn commit: r1076949 [1/3] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
contrib/fairscheduler/src/java/org/apa...
Author: omalley
Date: Fri Mar 4 03:25:10 2011
New Revision: 1076949
URL: http://svn.apache.org/viewvc?rev=1076949&view=rev
Log:
commit 15c352629793f64a0fc8ab2ad24a6c081f731687
Author: Lee Tucker <lt...@yahoo-inc.com>
Date: Thu Jul 30 17:40:34 2009 -0700
Applying patch 2836271.mr516.patch
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/jobtracker/
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobfailures.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetails.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 03:25:10 2011
@@ -32,6 +32,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A {@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -290,13 +293,14 @@ class CapacityTaskScheduler extends Task
/** our TaskScheduler object */
protected CapacityTaskScheduler scheduler;
- protected CapacityTaskScheduler.TYPE type = null;
+ protected TaskType type = null;
abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
JobInProgress job) throws IOException;
int getSlotsOccupied(JobInProgress job) {
- return getRunningTasks(job) * getSlotsPerTask(job);
+ return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) *
+ getSlotsPerTask(job);
}
abstract int getClusterCapacity();
@@ -304,6 +308,8 @@ class CapacityTaskScheduler extends Task
abstract int getRunningTasks(JobInProgress job);
abstract int getPendingTasks(JobInProgress job);
abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
+ abstract int getNumReservedTaskTrackers(JobInProgress job);
+
/**
* To check if job has a speculative task on the particular tracker.
*
@@ -315,6 +321,18 @@ class CapacityTaskScheduler extends Task
TaskTrackerStatus tts);
/**
+ * Check if the given job has sufficient reserved tasktrackers for all its
+ * pending tasks.
+ *
+ * @param job job to check for sufficient reserved tasktrackers
+ * @return <code>true</code> if the job has reserved tasktrackers,
+ * else <code>false</code>
+ */
+ boolean hasSufficientReservedTaskTrackers(JobInProgress job) {
+ return getNumReservedTaskTrackers(job) >= getPendingTasks(job);
+ }
+
+ /**
* List of QSIs for assigning tasks.
* Queues are ordered by a ratio of (# of running tasks)/capacity, which
* indicates how much 'free space' the queue has, or how much it is over
@@ -419,10 +437,10 @@ class CapacityTaskScheduler extends Task
* It tries to get a task from jobs in a single queue.
* Always return a TaskLookupResult object. Don't return null.
*/
- private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
- QueueSchedulingInfo qsi)
- throws IOException {
-
+ private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
+ QueueSchedulingInfo qsi)
+ throws IOException {
+ TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
// we only look at jobs in the running queues, as these are the ones
// who have been potentially initialized
@@ -442,9 +460,9 @@ class CapacityTaskScheduler extends Task
//a task to be scheduled on the task tracker.
//if we find a job then we pass it on.
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
- taskTracker)) {
+ taskTrackerStatus)) {
// We found a suitable job. Get task from it.
- Task t = obtainNewTask(taskTracker, j);
+ Task t = obtainNewTask(taskTrackerStatus, j);
//if there is a task return it immediately.
if (t != null) {
// we're successful in getting a task
@@ -457,10 +475,18 @@ class CapacityTaskScheduler extends Task
}
} else {
//if memory requirements don't match then we check if the
- //job has either pending or speculative task. If the job
- //has pending or speculative task we block till this job
- //tasks get scheduled. So that high memory jobs are not starved
- if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
+ //job has either pending or speculative task or has insufficient number
+ //of 'reserved' tasktrackers to cover all pending tasks. If so
+ //we reserve the current tasktracker for this job so that
+ //high memory jobs are not starved
+ if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus) ||
+ !hasSufficientReservedTaskTrackers(j)) {
+ // Reserve all available slots on this tasktracker
+ LOG.info(j.getJobID() + ": Reserving " + taskTracker.getTrackerName() +
+ " since memory-requirements don't match");
+ taskTracker.reserveSlots(type, j, taskTracker.getAvailableSlots(type));
+
+ // Block
return TaskLookupResult.getMemFailedResult();
}
}//end of memory check block
@@ -472,7 +498,9 @@ class CapacityTaskScheduler extends Task
// the user limit for some user is too strict, i.e., there's at least
// one user who doesn't have enough tasks to satisfy his limit. If
// it's the latter case, re-look at jobs without considering user
- // limits, and get a task from the first eligible job
+ // limits, and get a task from the first eligible job; however
+ // we do not 'reserve' slots on tasktrackers anymore since the user is
+ // already over the limit
// Note: some of the code from above is repeated here. This is on
// purpose as it improves overall readability.
// Note: we walk through jobs again. Some of these jobs, which weren't
@@ -488,9 +516,9 @@ class CapacityTaskScheduler extends Task
continue;
}
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
- taskTracker)) {
+ taskTrackerStatus)) {
// We found a suitable job. Get task from it.
- Task t = obtainNewTask(taskTracker, j);
+ Task t = obtainNewTask(taskTrackerStatus, j);
//if there is a task return it immediately.
if (t != null) {
// we're successful in getting a task
@@ -505,7 +533,7 @@ class CapacityTaskScheduler extends Task
//has pending or speculative task we block till this job
//tasks get scheduled, so that high memory jobs are not
//starved
- if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
+ if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus)) {
return TaskLookupResult.getMemFailedResult();
}
}//end of memory check block
@@ -520,10 +548,52 @@ class CapacityTaskScheduler extends Task
// Always return a TaskLookupResult object. Don't return null.
// The caller is responsible for ensuring that the QSI objects and the
// collections are up-to-date.
- private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+ private TaskLookupResult assignTasks(TaskTracker taskTracker)
+ throws IOException {
+ TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
printQSIs();
+ // Check if this tasktracker has been reserved for a job...
+ JobInProgress job = taskTracker.getJobForFallowSlot(type);
+ if (job != null) {
+ int availableSlots = taskTracker.getAvailableSlots(type);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " +
+ taskTracker.getTrackerName() + " with " + availableSlots +
+ " '" + type + "' slots");
+ }
+
+ if (availableSlots >= job.getNumSlotsPerTask(type)) {
+ // Unreserve
+ taskTracker.unreserveSlots(type, job);
+
+ // We found a suitable job. Get task from it.
+ Task t = obtainNewTask(taskTrackerStatus, job);
+ //if there is a task return it immediately.
+ if (t != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info(job.getJobID() + ": Got " + t.getTaskID() +
+ " for reserved tasktracker " +
+ taskTracker.getTrackerName());
+ }
+ // we're successful in getting a task
+ return TaskLookupResult.getTaskFoundResult(t);
+ }
+ } else {
+ // Re-reserve the current tasktracker
+ taskTracker.reserveSlots(type, job, availableSlots);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(job.getJobID() + ": Re-reserving " +
+ taskTracker.getTrackerName());
+ }
+
+ return TaskLookupResult.getMemFailedResult();
+ }
+ }
+
+
for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
// we may have queues with capacity=0. We shouldn't look at jobs from
// these queues
@@ -600,7 +670,7 @@ class CapacityTaskScheduler extends Task
MapSchedulingMgr(CapacityTaskScheduler schedulr) {
super(schedulr);
- type = CapacityTaskScheduler.TYPE.MAP;
+ type = TaskType.MAP;
queueComparator = mapComparator;
}
@@ -631,9 +701,8 @@ class CapacityTaskScheduler extends Task
@Override
int getSlotsPerTask(JobInProgress job) {
- long myVmem = job.getJobConf().getMemoryForMapTask();
- return (int) (Math.ceil((float) myVmem
- / (float) scheduler.getMemSizeForMapSlot()));
+ return
+ job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());
}
@Override
@@ -641,6 +710,10 @@ class CapacityTaskScheduler extends Task
return qsi.mapTSI;
}
+ int getNumReservedTaskTrackers(JobInProgress job) {
+ return job.getNumReservedTaskTrackersForMaps();
+ }
+
@Override
boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
//Check if job supports speculative map execution first then
@@ -659,7 +732,7 @@ class CapacityTaskScheduler extends Task
ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
super(schedulr);
- type = CapacityTaskScheduler.TYPE.REDUCE;
+ type = TaskType.REDUCE;
queueComparator = reduceComparator;
}
@@ -691,9 +764,8 @@ class CapacityTaskScheduler extends Task
@Override
int getSlotsPerTask(JobInProgress job) {
- long myVmem = job.getJobConf().getMemoryForReduceTask();
- return (int) (Math.ceil((float) myVmem
- / (float) scheduler.getMemSizeForReduceSlot()));
+ return
+ job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot());
}
@Override
@@ -701,6 +773,10 @@ class CapacityTaskScheduler extends Task
return qsi.reduceTSI;
}
+ int getNumReservedTaskTrackers(JobInProgress job) {
+ return job.getNumReservedTaskTrackersForReduces();
+ }
+
@Override
boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
//check if the job supports reduce speculative execution first then
@@ -729,9 +805,10 @@ class CapacityTaskScheduler extends Task
/** whether scheduler has started or not */
private boolean started = false;
- static String JOB_SCHEDULING_INFO_FORMAT_STRING =
- "%s running map tasks using %d map slots,"
- + " %s running reduce tasks using %d reduce slots.";
+ final static String JOB_SCHEDULING_INFO_FORMAT_STRING =
+ "%s running map tasks using %d map slots. %d additional slots reserved." +
+ " %s running reduce tasks using %d reduce slots." +
+ " %d additional slots reserved.";
/**
* A clock class - can be mocked out for testing.
*/
@@ -741,11 +818,6 @@ class CapacityTaskScheduler extends Task
}
}
- // can be replaced with a global type, if we have one
- protected static enum TYPE {
- MAP, REDUCE
- }
-
private Clock clock;
private JobInitializationPoller initializationPoller;
@@ -859,10 +931,10 @@ class CapacityTaskScheduler extends Task
return limitMaxMemForReduceTasks;
}
- String[] getOrderedQueues(CapacityTaskScheduler.TYPE type) {
- if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ String[] getOrderedQueues(TaskType type) {
+ if (type == TaskType.MAP) {
return mapScheduler.getOrderedQueues();
- } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ } else if (type == TaskType.REDUCE) {
return reduceScheduler.getOrderedQueues();
}
return null;
@@ -1015,13 +1087,26 @@ class CapacityTaskScheduler extends Task
int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+ int numRunningMapSlots =
+ numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
+ int numRunningReduceSlots =
+ numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
- j.setSchedulingInfo(String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
- Integer.valueOf(numMapsRunningForThisJob), Integer
- .valueOf(numMapSlotsForThisJob), Integer
- .valueOf(numReducesRunningForThisJob), Integer
- .valueOf(numReduceSlotsForThisJob)));
+ int numReservedMapSlotsForThisJob =
+ (mapScheduler.getNumReservedTaskTrackers(j) *
+ mapScheduler.getSlotsPerTask(j));
+ int numReservedReduceSlotsForThisJob =
+ (reduceScheduler.getNumReservedTaskTrackers(j) *
+ reduceScheduler.getSlotsPerTask(j));
+ j.setSchedulingInfo(
+ String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
+ Integer.valueOf(numMapsRunningForThisJob),
+ Integer.valueOf(numRunningMapSlots),
+ Integer.valueOf(numReservedMapSlotsForThisJob),
+ Integer.valueOf(numReducesRunningForThisJob),
+ Integer.valueOf(numRunningReduceSlots),
+ Integer.valueOf(numReservedReduceSlotsForThisJob)));
qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
@@ -1077,10 +1162,12 @@ class CapacityTaskScheduler extends Task
*
*/
@Override
- public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
- throws IOException {
+ public synchronized List<Task> assignTasks(TaskTracker taskTracker)
+ throws IOException {
TaskLookupResult tlr;
+ TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
+
/*
* If TT has Map and Reduce slot free, we need to figure out whether to
* give it a Map or Reduce task.
@@ -1090,14 +1177,14 @@ class CapacityTaskScheduler extends Task
ClusterStatus c = taskTrackerManager.getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
int reduceClusterCapacity = c.getMaxReduceTasks();
- int maxMapTasks = taskTracker.getMaxMapTasks();
- int currentMapTasks = taskTracker.countMapTasks();
- int maxReduceTasks = taskTracker.getMaxReduceTasks();
- int currentReduceTasks = taskTracker.countReduceTasks();
- LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() +
- ", run maps=" + taskTracker.countMapTasks() + ", max reds=" +
- taskTracker.getMaxReduceTasks() + ", run reds=" +
- taskTracker.countReduceTasks() + ", map cap=" +
+ int maxMapSlots = taskTrackerStatus.getMaxMapSlots();
+ int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
+ int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
+ int currentReduceSlots = taskTrackerStatus.countOccupiedReduceSlots();
+ LOG.debug("TT asking for task, max maps=" + taskTrackerStatus.getMaxMapSlots() +
+ ", run maps=" + taskTrackerStatus.countMapTasks() + ", max reds=" +
+ taskTrackerStatus.getMaxReduceSlots() + ", run reds=" +
+ taskTrackerStatus.countReduceTasks() + ", map cap=" +
mapClusterCapacity + ", red cap = " +
reduceClusterCapacity);
@@ -1111,8 +1198,8 @@ class CapacityTaskScheduler extends Task
// make sure we get our map or reduce scheduling object to update its
// collection of QSI objects too.
- if ((maxReduceTasks - currentReduceTasks) >
- (maxMapTasks - currentMapTasks)) {
+ if ((maxReduceSlots - currentReduceSlots) >
+ (maxMapSlots - currentMapSlots)) {
// get a reduce task first
reduceScheduler.updateCollectionOfQSIs();
tlr = reduceScheduler.assignTasks(taskTracker);
@@ -1126,7 +1213,7 @@ class CapacityTaskScheduler extends Task
== tlr.getLookUpStatus() ||
TaskLookupResult.LookUpStatus.NO_TASK_FOUND
== tlr.getLookUpStatus())
- && (maxMapTasks > currentMapTasks)) {
+ && (maxMapSlots > currentMapSlots)) {
mapScheduler.updateCollectionOfQSIs();
tlr = mapScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -1149,7 +1236,7 @@ class CapacityTaskScheduler extends Task
== tlr.getLookUpStatus()
|| TaskLookupResult.LookUpStatus.NO_TASK_FOUND
== tlr.getLookUpStatus())
- && (maxReduceTasks > currentReduceTasks)) {
+ && (maxReduceSlots > currentReduceSlots)) {
reduceScheduler.updateCollectionOfQSIs();
tlr = reduceScheduler.assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus.TASK_FOUND ==
@@ -1181,10 +1268,33 @@ class CapacityTaskScheduler extends Task
i++;
}
qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+
+ // setup scheduler specific job information
+ preInitializeJob(job);
+
LOG.debug("Job " + job.getJobID().toString() + " is added under user "
+ job.getProfile().getUser() + ", user now has " + i + " jobs");
}
+ /**
+ * Setup {@link CapacityTaskScheduler} specific information prior to
+ * job initialization.
+ */
+ void preInitializeJob(JobInProgress job) {
+ JobConf jobConf = job.getJobConf();
+
+ // Compute number of slots required to run a single map/reduce task
+ int slotsPerMap = 1;
+ int slotsPerReduce = 1;
+ if (memoryMatcher.isSchedulingBasedOnMemEnabled()) {
+ slotsPerMap = jobConf.computeNumSlotsPerMap(getMemSizeForMapSlot());
+ slotsPerReduce =
+ jobConf.computeNumSlotsPerReduce(getMemSizeForReduceSlot());
+ }
+ job.setNumSlotsPerMap(slotsPerMap);
+ job.setNumSlotsPerReduce(slotsPerReduce);
+ }
+
// called when a job completes
synchronized void jobCompleted(JobInProgress job) {
QueueSchedulingInfo qsi =
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Fri Mar 4 03:25:10 2011
@@ -263,7 +263,8 @@ public class JobInitializationPoller ext
* poller
*/
- void init(Set<String> queues, CapacitySchedulerConf capacityConf) {
+ void init(Set<String> queues,
+ CapacitySchedulerConf capacityConf) {
for (String queue : queues) {
int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Fri Mar 4 03:25:10 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TaskType;
class MemoryMatcher {
@@ -54,55 +55,26 @@ class MemoryMatcher {
* null if memory cannot be computed for some reason.
*/
synchronized Long getMemReservedForTasks(
- TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
+ TaskTrackerStatus taskTracker, TaskType taskType) {
long vmem = 0;
for (TaskStatus task : taskTracker.getTaskReports()) {
// the following task states are one in which the slot is
// still occupied and hence memory of the task should be
// accounted in used memory.
- if ((task.getRunState() == TaskStatus.State.RUNNING)
- || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
- JobInProgress job =
- scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
- if (job == null) {
- // This scenario can happen if a job was completed/killed
- // and retired from JT's memory. In this state, we can ignore
- // the running task status and compute memory for the rest of
- // the tasks. However, any scheduling done with this computation
- // could result in over-subscribing of memory for tasks on this
- // TT (as the unaccounted for task is still running).
- // So, it is safer to not schedule anything for this TT
- // One of the ways of doing that is to return null from here
- // and check for null in the calling method.
- LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
- + "a running / commit pending task: " + task.getTaskID()
- + " but no corresponding job was found. "
- + "Maybe job was retired. Not computing "
- + "memory values for this TT.");
- return null;
- }
-
- JobConf jConf = job.getJobConf();
-
- // Get the memory "allotted" for this task by rounding off the job's
- // tasks' memory limits to the nearest multiple of the slot-memory-size
- // set on JT. This essentially translates to tasks of a high memory job
- // using multiple slots.
+ if ((task.getRunState() == TaskStatus.State.RUNNING) ||
+ (task.getRunState() == TaskStatus.State.UNASSIGNED) ||
+ (task.inTaskCleanupPhase())) {
+ // Get the memory "allotted" for this task based on number of slots
long myVmem = 0;
- if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
- myVmem = jConf.getMemoryForMapTask();
- myVmem =
- (long) (scheduler.getMemSizeForMapSlot() * Math
- .ceil((float) myVmem
- / (float) scheduler.getMemSizeForMapSlot()));
+ if (task.getIsMap() && taskType == TaskType.MAP) {
+ long memSizePerMapSlot = scheduler.getMemSizeForMapSlot();
+ myVmem =
+ memSizePerMapSlot * task.getNumSlots();
} else if (!task.getIsMap()
- && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
- myVmem = jConf.getMemoryForReduceTask();
- myVmem =
- (long) (scheduler.getMemSizeForReduceSlot() * Math
- .ceil((float) myVmem
- / (float) scheduler.getMemSizeForReduceSlot()));
+ && taskType == TaskType.REDUCE) {
+ long memSizePerReduceSlot = scheduler.getMemSizeForReduceSlot();
+ myVmem = memSizePerReduceSlot * task.getNumSlots();
}
vmem += myVmem;
}
@@ -118,8 +90,8 @@ class MemoryMatcher {
* @param taskTracker
* @return true if this TT has enough memory for this job. False otherwise.
*/
- boolean matchesMemoryRequirements(JobInProgress job,
- CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
+ boolean matchesMemoryRequirements(JobInProgress job,TaskType taskType,
+ TaskTrackerStatus taskTracker) {
LOG.debug("Matching memory requirements of " + job.getJobID().toString()
+ " for scheduling on " + taskTracker.trackerName);
@@ -131,44 +103,36 @@ class MemoryMatcher {
}
Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
- if (memUsedOnTT == null) {
- // For some reason, maybe because we could not find the job
- // corresponding to a running task (as can happen if the job
- // is retired in between), we could not compute the memory state
- // on this TT. Treat this as an error, and fail memory
- // requirements.
- LOG.info("Could not compute memory for taskTracker: "
- + taskTracker.getHost() + ". Failing memory requirements.");
- return false;
- }
-
long totalMemUsableOnTT = 0;
-
long memForThisTask = 0;
- if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ if (taskType == TaskType.MAP) {
memForThisTask = job.getJobConf().getMemoryForMapTask();
totalMemUsableOnTT =
- scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
- } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
+ } else if (taskType == TaskType.REDUCE) {
memForThisTask = job.getJobConf().getMemoryForReduceTask();
totalMemUsableOnTT =
scheduler.getMemSizeForReduceSlot()
- * taskTracker.getMaxReduceTasks();
+ * taskTracker.getMaxReduceSlots();
}
long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
if (memForThisTask > freeMemOnTT) {
- LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
- + freeMemOnTT + "). A " + taskType + " task from "
- + job.getJobID().toString() + " cannot be scheduled on TT "
- + taskTracker.trackerName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+ + freeMemOnTT + "). A " + taskType + " task from "
+ + job.getJobID().toString() + " cannot be scheduled on TT "
+ + taskTracker.trackerName);
+ }
return false;
}
- LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
- + freeMemOnTT + ". A " + taskType.toString() + " task from "
- + job.getJobID().toString() + " matches memory requirements on TT "
- + taskTracker.trackerName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+ + freeMemOnTT + ". A " + taskType.toString() + " task from "
+ + job.getJobID().toString() + " matches memory requirements "
+ + "on TT "+ taskTracker.trackerName);
+ }
return true;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:25:10 2011
@@ -40,6 +40,9 @@ import org.apache.hadoop.mapred.JobStatu
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
public class TestCapacityScheduler extends TestCase {
static final Log LOG =
@@ -200,7 +203,8 @@ public class TestCapacityScheduler exten
}
}
TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
+ Task task = new MapTask("", attemptId, 0, "", new BytesWritable(),
+ super.numSlotsPerMap, getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -242,7 +246,7 @@ public class TestCapacityScheduler exten
}
}
TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
- Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
+ Task task = new ReduceTask("", attemptId, 0, 10, super.numSlotsPerReduce, getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -333,7 +337,7 @@ public class TestCapacityScheduler exten
FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
boolean isMap, FakeJobInProgress job) {
- super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
+ super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
this.isMap = isMap;
this.fakeJob = job;
activeTasks = new TreeMap<TaskAttemptID, String>();
@@ -418,8 +422,8 @@ public class TestCapacityScheduler exten
new ArrayList<JobInProgressListener>();
FakeQueueManager qm = new FakeQueueManager();
- private Map<String, TaskTrackerStatus> trackers =
- new HashMap<String, TaskTrackerStatus>();
+ private Map<String, TaskTracker> trackers =
+ new HashMap<String, TaskTracker>();
private Map<String, TaskStatus> taskStatuses =
new HashMap<String, TaskStatus>();
private Map<JobID, JobInProgress> jobs =
@@ -435,16 +439,22 @@ public class TestCapacityScheduler exten
this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
for (int i = 1; i < numTaskTrackers + 1; i++) {
String ttName = "tt" + i;
- trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", i,
- new ArrayList<TaskStatus>(), 0, maxMapTasksPerTracker,
- maxReduceTasksPerTracker));
+ TaskTracker tt = new TaskTracker(ttName);
+ tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker,
+ maxReduceTasksPerTracker));
+ trackers.put(ttName, tt);
}
}
public void addTaskTracker(String ttName) {
- trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ TaskTracker tt = new TaskTracker(ttName);
+ tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker,
+ maxReduceTasksPerTracker));
+ trackers.put(ttName, tt);
}
public ClusterStatus getClusterStatus() {
@@ -505,7 +515,11 @@ public class TestCapacityScheduler exten
}
public Collection<TaskTrackerStatus> taskTrackers() {
- return trackers.values();
+ List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+ for (TaskTracker tt : trackers.values()) {
+ statuses.add(tt.getStatus());
+ }
+ return statuses;
}
@@ -524,7 +538,7 @@ public class TestCapacityScheduler exten
}
}
- public TaskTrackerStatus getTaskTracker(String trackerID) {
+ public TaskTracker getTaskTracker(String trackerID) {
return trackers.get(trackerID);
}
@@ -544,10 +558,15 @@ public class TestCapacityScheduler exten
public boolean getIsMap() {
return t.isMapTask();
}
+
+ @Override
+ public int getNumSlots() {
+ return t.getNumSlotsRequired();
+ }
};
taskStatuses.put(t.getTaskID().toString(), status);
status.setRunState(TaskStatus.State.RUNNING);
- trackers.get(taskTrackerName).getTaskReports().add(status);
+ trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
}
public void finishTask(String taskTrackerName, String tipId,
@@ -1698,14 +1717,14 @@ public class TestCapacityScheduler exten
// first, a map from j1 will run
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// Total 2 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
// at this point, the scheduler tries to schedule another map from j1.
// there isn't enough space. The second job's reduce should be scheduled.
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
// Total 1 reduce slot should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
100.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
}
@@ -1754,17 +1773,19 @@ public class TestCapacityScheduler exten
// Fill the second tt with this job.
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
// Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 1, 0, 0, 0, 0),
(String) job1.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
// Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
25.0f);
assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 1, 0, 1, 1, 0),
(String) job1.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
@@ -1781,18 +1802,20 @@ public class TestCapacityScheduler exten
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
// Total 3 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 2, 0, 0, 0, 0),
(String) job2.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
// Total 3 reduce slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
75.0f);
assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 2, 0, 1, 2, 0),
(String) job2.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
@@ -1812,16 +1835,24 @@ public class TestCapacityScheduler exten
assertNull(scheduler.assignTasks(tracker("tt2")));
assertNull(scheduler.assignTasks(tracker("tt1")));
assertNull(scheduler.assignTasks(tracker("tt2")));
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
- 75.0f);
+ // reserved tasktrackers contribute to occupied slots
+ // for maps, both tasktrackers are reserved.
+ checkOccupiedSlots("default", TaskType.MAP, 1, 7, 175.0f);
+ // for reduces, only one tasktracker is reserved, because
+ // the reduce scheduler is not visited for tt1 (as it has
+ // 0 slots free).
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 5,
+ 125.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+ LOG.info(job2.getSchedulingInfo());
assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 2, 4, 1, 2, 2),
(String) job2.getSchedulingInfo());
assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 0, 0, 0, 0, 0, 0),
(String) job3.getSchedulingInfo());
}
@@ -1872,62 +1903,65 @@ public class TestCapacityScheduler exten
// 1st cycle - 1 map gets assigned.
Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
checkMemReservedForTasksOnTT("tt1", 512L, 0L);
// 1st cycle of reduces - 1 reduce gets assigned.
Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
// Total 1 reduce slot should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
50.0f);
checkMemReservedForTasksOnTT("tt1", 512L, 512L);
// kill this job !
taskTrackerManager.killJob(job1.getJobID());
// No more map/reduce slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+ checkOccupiedSlots("default", TaskType.MAP, 0, 0, 0.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 0, 0,
0.0f);
// retire the job
taskTrackerManager.removeJob(job1.getJobID());
// submit another job.
- LOG.debug("Submitting another normal job with 1 map and 1 reduce");
+ LOG.debug("Submitting another normal job with 2 maps and 2 reduces");
jConf = new JobConf();
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
jConf.setMemoryForMapTask(512);
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- // 2nd cycle - nothing should get assigned. Memory matching code
- // will see the job is missing and fail memory requirements.
+ // since with HADOOP-5964, we don't rely on a job conf to get
+ // the memory occupied, scheduling should be able to work correctly.
+ t1 = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
+ checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+
+ // assign a reduce now.
+ t1 = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
+ checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
+
+ // now, no more can be assigned because all the slots are blocked.
assertNull(scheduler.assignTasks(tracker("tt1")));
- checkMemReservedForTasksOnTT("tt1", null, null);
- // calling again should not make a difference, as the task is still running
- assertNull(scheduler.assignTasks(tracker("tt1")));
- checkMemReservedForTasksOnTT("tt1", null, null);
-
// finish the tasks on the tracker.
taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
-
+
// now a new task can be assigned.
- t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- // Total 1 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
- checkMemReservedForTasksOnTT("tt1", 512L, 0L);
-
+ t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
+ // memory used will change because of the finished task above.
+ checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+
// reduce can be assigned.
- t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
- // Total 1 reduce slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
- 50.0f);
- checkMemReservedForTasksOnTT("tt1", 512L, 512L);
+ t = checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
}
/*
@@ -2322,25 +2356,13 @@ public class TestCapacityScheduler exten
}
/**
- * Test case to test scheduling of
- * <ol>
- * <li>High ram job with speculative map execution.
- * <ul>
- * <li>Submit one high ram job which has speculative map.</li>
- * <li>Submit a normal job which has no speculative map.</li>
- * <li>Scheduler should schedule first all map tasks from first job and block
- * the cluster till both maps from first job get completed.
- * </ul>
- * </li>
- * <li>High ram job with speculative reduce execution.
- * <ul>
- * <li>Submit one high ram job which has speculative reduce.</li>
- * <li>Submit a normal job which has no speculative reduce.</li>
- * <li>Scheduler should schedule first all reduce tasks from first job and
- * block the cluster till both reduces are completed.</li>
- * </ul>
- * </li>
- * </ol>
+ * Test case to test scheduling of jobs with speculative execution
+ * in the face of high RAM jobs.
+ *
+ * Essentially, the test verifies that if a high RAM job has speculative
+ * tasks that cannot run because of memory requirements, we block
+ * that node and do not return any tasks to it.
+ *
* @throws IOException
*/
public void testHighRamJobWithSpeculativeExecution() throws IOException {
@@ -2367,8 +2389,18 @@ public class TestCapacityScheduler exten
scheduler.setResourceManagerConf(resConf);
scheduler.start();
+ // Submit a normal job that should occupy a node
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(0);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = submitJob(JobStatus.PREP, jConf);
+
//Submit a high memory job with speculative tasks.
- JobConf jConf = new JobConf();
+ jConf = new JobConf();
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(1);
@@ -2377,13 +2409,13 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
jConf.setMapSpeculativeExecution(true);
jConf.setReduceSpeculativeExecution(false);
- FakeJobInProgress job1 =
+ FakeJobInProgress job2 =
new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
taskTrackerManager, "u1");
- taskTrackerManager.submitJob(job1);
+ taskTrackerManager.submitJob(job2);
//Submit normal job
- jConf = new JobConf();
+ jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(1);
@@ -2392,127 +2424,46 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
jConf.setMapSpeculativeExecution(false);
jConf.setReduceSpeculativeExecution(false);
- FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
+ FakeJobInProgress job3 = submitJob(JobStatus.PREP, jConf);
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(scheduler.jobQueuesManager);
- // first, a map from j1 will run
- // at this point, there is a speculative task for the same job to be
- //scheduled. This task would be scheduled. Till the tasks from job1 gets
- //complete none of the tasks from other jobs would be scheduled.
+ // Have one node on which all tasks of job1 are scheduled.
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
- assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
- // Total 2 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- //make same tracker get back, check if you are blocking. Your job
- //has speculative map task so tracker should be blocked even tho' it
- //can run job2's map.
- assertNull(scheduler.assignTasks(tracker("tt1")));
- // Total 2 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ // raise events to initialize the 3rd job
+ controlledInitializationPoller.selectJobsToInitialize();
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
- //TT2 now gets speculative map of the job1
- checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
- // Total 4 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
+ // On the second node, one task of the high RAM job can be scheduled.
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
-
- // Now since the first job has no more speculative maps, it can schedule
- // the second job.
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- // Total 5 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
- checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
-
- //finish everything
+ assertEquals("pending maps greater than zero " , job2.pendingMaps(), 0);
+ // Total 4 map slots should be accounted for.
+ checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
+
+ // now when the first node gets back, it cannot run any task
+ // because job2 has a speculative task that can run on this node.
+ // This is even though job3's tasks can run on this node.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ // Reservation will count for 2 more slots.
+ checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+
+ // finish one task from tt1.
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0",
job1);
- taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1",
- job1);
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0",
- job2);
- taskTrackerManager.finalizeJob(job1);
- taskTrackerManager.finalizeJob(job2);
- //Now submit high ram job with speculative reduce and check.
- jConf = new JobConf();
- jConf.setMemoryForMapTask(2 * 1024);
- jConf.setMemoryForReduceTask(2 * 1024L);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- jConf.setMapSpeculativeExecution(false);
- jConf.setReduceSpeculativeExecution(true);
- FakeJobInProgress job3 =
- new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
- taskTrackerManager, "u1");
- taskTrackerManager.submitJob(job3);
-
- //Submit normal job w.r.t reduces
- jConf = new JobConf();
- jConf.setMemoryForMapTask(1 * 1024L);
- jConf.setMemoryForReduceTask(1 * 1024L);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- jConf.setMapSpeculativeExecution(false);
- jConf.setReduceSpeculativeExecution(false);
- FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
+ // now, we can schedule the speculative task on tt1
+ checkAssignment("tt1", "attempt_test_0002_m_000001_1 on tt1");
- controlledInitializationPoller.selectJobsToInitialize();
- raiseStatusChangeEvents(scheduler.jobQueuesManager);
-
- // Finish up the map scheduler
+ // finish one more task from tt1.
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0",
+ job1);
+
+ // now the new job's tasks can be scheduled.
checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
- // Total 2 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
- checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
- // Total 3 map slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
-
- // first, a reduce from j3 will run
- // at this point, there is a speculative task for the same job to be
- //scheduled. This task would be scheduled. Till the tasks from job3 gets
- //complete none of the tasks from other jobs would be scheduled.
- checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
- assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
- 0);
- // Total 2 reduce slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
- 33.3f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
-
- //make same tracker get back, check if you are blocking. Your job
- //has speculative reduce task so tracker should be blocked even tho' it
- //can run job4's reduce.
- assertNull(scheduler.assignTasks(tracker("tt1")));
- // Total 2 reduce slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
- 33.3f);
-
- //TT2 now gets speculative reduce of the job3
- checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
- // Total 4 reduce slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
- 66.7f);
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
-
- // Now since j3 has no more speculative reduces, it can schedule
- // the j4.
- checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
- // Total 5 reduce slots should be accounted for.
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
- 83.3f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
}
/**
@@ -2570,32 +2521,32 @@ public class TestCapacityScheduler exten
// Map 1 of high memory job
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
checkQueuesOrder(qs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+ .getOrderedQueues(TaskType.MAP));
// Reduce 1 of high memory job
checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
checkQueuesOrder(qs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+ .getOrderedQueues(TaskType.REDUCE));
// Map 1 of normal job
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+ .getOrderedQueues(TaskType.MAP));
// Reduce 1 of normal job
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+ .getOrderedQueues(TaskType.REDUCE));
// Map 2 of normal job
checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+ .getOrderedQueues(TaskType.MAP));
// Reduce 2 of normal job
checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+ .getOrderedQueues(TaskType.REDUCE));
// Now both the queues are equally served. But the comparator doesn't change
// the order if queues are equally served.
@@ -2603,32 +2554,32 @@ public class TestCapacityScheduler exten
// Map 3 of normal job
checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+ .getOrderedQueues(TaskType.MAP));
// Reduce 3 of normal job
checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+ .getOrderedQueues(TaskType.REDUCE));
// Map 2 of high memory job
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
checkQueuesOrder(qs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+ .getOrderedQueues(TaskType.MAP));
// Reduce 2 of high memory job
checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
checkQueuesOrder(qs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+ .getOrderedQueues(TaskType.REDUCE));
// Map 4 of normal job
checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+ .getOrderedQueues(TaskType.MAP));
// Reduce 4 of normal job
checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+ .getOrderedQueues(TaskType.REDUCE));
}
private void checkFailedInitializedJobMovement() throws IOException {
@@ -2713,7 +2664,7 @@ public class TestCapacityScheduler exten
}
- protected TaskTrackerStatus tracker(String taskTrackerName) {
+ protected TaskTracker tracker(String taskTrackerName) {
return taskTrackerManager.getTaskTracker(taskTrackerName);
}
@@ -2737,11 +2688,11 @@ public class TestCapacityScheduler exten
private void checkMemReservedForTasksOnTT(String taskTracker,
Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
Long observedMemForMapsOnTT =
- scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
- CapacityTaskScheduler.TYPE.MAP);
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+ TaskType.MAP);
Long observedMemForReducesOnTT =
- scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
- CapacityTaskScheduler.TYPE.REDUCE);
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+ TaskType.REDUCE);
if (expectedMemForMapsOnTT == null) {
assertTrue(observedMemForMapsOnTT == null);
} else {
@@ -2765,7 +2716,7 @@ public class TestCapacityScheduler exten
* @return
*/
private void checkOccupiedSlots(String queue,
- CapacityTaskScheduler.TYPE type, int numActiveUsers,
+ TaskType type, int numActiveUsers,
int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
scheduler.updateQSIInfoForTests();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
@@ -2773,9 +2724,9 @@ public class TestCapacityScheduler exten
queueManager.getJobQueueInfo(queue).getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
int index = -1;
- if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+ if (type.equals(TaskType.MAP)) {
index = 7;
- } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+ } else if (type.equals(TaskType.REDUCE)) {
index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
}
LOG.info(infoStrings[index]);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Fri Mar 4 03:25:10 2011
@@ -40,13 +40,13 @@ public class CapBasedLoadManager extends
public boolean canAssignMap(TaskTrackerStatus tracker,
int totalRunnableMaps, int totalMapSlots) {
return tracker.countMapTasks() < getCap(totalRunnableMaps,
- tracker.getMaxMapTasks(), totalMapSlots);
+ tracker.getMaxMapSlots(), totalMapSlots);
}
@Override
public boolean canAssignReduce(TaskTrackerStatus tracker,
int totalRunnableReduces, int totalReduceSlots) {
return tracker.countReduceTasks() < getCap(totalRunnableReduces,
- tracker.getMaxReduceTasks(), totalReduceSlots);
+ tracker.getMaxReduceSlots(), totalReduceSlots);
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Mar 4 03:25:10 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A {@link TaskScheduler} that implements fair sharing.
@@ -218,7 +219,7 @@ public class FairScheduler extends TaskS
}
@Override
- public synchronized List<Task> assignTasks(TaskTrackerStatus tracker)
+ public synchronized List<Task> assignTasks(TaskTracker tracker)
throws IOException {
if (!initialized) // Don't try to assign tasks if we haven't yet started up
return null;
@@ -244,10 +245,11 @@ public class FairScheduler extends TaskS
// Scan to see whether any job needs to run a map, then a reduce
ArrayList<Task> tasks = new ArrayList<Task>();
TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+ TaskTrackerStatus trackerStatus = tracker.getStatus();
for (TaskType taskType: types) {
boolean canAssign = (taskType == TaskType.MAP) ?
- loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) :
- loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots);
+ loadMgr.canAssignMap(trackerStatus, runnableMaps, totalMapSlots) :
+ loadMgr.canAssignReduce(trackerStatus, runnableReduces, totalReduceSlots);
if (canAssign) {
// Figure out the jobs that need this type of task
List<JobInProgress> candidates = new ArrayList<JobInProgress>();
@@ -263,8 +265,8 @@ public class FairScheduler extends TaskS
Collections.sort(candidates, comparator);
for (JobInProgress job: candidates) {
Task task = (taskType == TaskType.MAP ?
- taskSelector.obtainNewMapTask(tracker, job) :
- taskSelector.obtainNewReduceTask(tracker, job));
+ taskSelector.obtainNewMapTask(trackerStatus, job) :
+ taskSelector.obtainNewReduceTask(trackerStatus, job));
if (task != null) {
// Update the JobInfo for this job so we account for the launched
// tasks during this update interval and don't try to launch more
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar 4 03:25:10 2011
@@ -33,6 +33,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
public class TestFairScheduler extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -67,7 +68,7 @@ public class TestFairScheduler extends T
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(true);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
+ Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1, getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -82,7 +83,7 @@ public class TestFairScheduler extends T
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(false);
- Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
+ Task task = new ReduceTask("", attemptId, 0, 10, 1, getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -108,18 +109,26 @@ public class TestFairScheduler extends T
List<JobInProgressListener> listeners =
new ArrayList<JobInProgressListener>();
- private Map<String, TaskTrackerStatus> trackers =
- new HashMap<String, TaskTrackerStatus>();
+ private Map<String, TaskTracker> trackers =
+ new HashMap<String, TaskTracker>();
private Map<String, TaskStatus> taskStatuses =
new HashMap<String, TaskStatus>();
public FakeTaskTrackerManager() {
- trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
- trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ TaskTracker tt1 = new TaskTracker("tt1");
+ tt1.setStatus(new TaskTrackerStatus("tt1", "tt1.host", 1,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker,
+ maxReduceTasksPerTracker));
+ trackers.put("tt1", tt1);
+
+ TaskTracker tt2 = new TaskTracker("tt2");
+ tt2.setStatus(new TaskTrackerStatus("tt2", "tt2.host", 2,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker,
+ maxReduceTasksPerTracker));
+ trackers.put("tt2", tt2);
+
}
@Override
@@ -143,7 +152,11 @@ public class TestFairScheduler extends T
@Override
public Collection<TaskTrackerStatus> taskTrackers() {
- return trackers.values();
+ List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+ for (TaskTracker tt : trackers.values()) {
+ statuses.add(tt.getStatus());
+ }
+ return statuses;
}
@@ -188,7 +201,7 @@ public class TestFairScheduler extends T
}
}
- public TaskTrackerStatus getTaskTracker(String trackerID) {
+ public TaskTracker getTaskTracker(String trackerID) {
return trackers.get(trackerID);
}
@@ -206,7 +219,7 @@ public class TestFairScheduler extends T
};
taskStatuses.put(t.getTaskID().toString(), status);
status.setRunState(TaskStatus.State.RUNNING);
- trackers.get(taskTrackerName).getTaskReports().add(status);
+ trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
}
public void finishTask(String taskTrackerName, String tipId) {
@@ -1227,7 +1240,7 @@ public class TestFairScheduler extends T
scheduler.update();
}
- protected TaskTrackerStatus tracker(String taskTrackerName) {
+ protected TaskTracker tracker(String taskTrackerName) {
return taskTrackerManager.getTaskTracker(taskTrackerName);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Mar 4 03:25:10 2011
@@ -61,8 +61,9 @@ interface InterTrackerProtocol extends V
* (HADOOP-4869)
* Version 24: Changed format of Task and TaskStatus for HADOOP-4759
* Version 25: JobIDs are passed in response to JobTracker restart
+ * Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
*/
- public static final long versionID = 25L;
+ public static final long versionID = 26L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar 4 03:25:10 2011
@@ -206,11 +206,13 @@ public class IsolationRunner {
BytesWritable split = new BytesWritable();
split.readFields(splitFile);
splitFile.close();
- task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, conf.getUser());
+ task = new MapTask(jobFilename.toString(), taskId, partition,
+ splitClass, split, 1, conf.getUser());
} else {
int numMaps = conf.getNumMapTasks();
fillInMissingMapOutputs(local, taskId, numMaps, conf);
- task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps, conf.getUser());
+ task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps,
+ 1, conf.getUser());
}
task.setConf(conf);
task.run(conf, new FakeUmbilical());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Mar 4 03:25:10 2011
@@ -1491,6 +1491,39 @@ public class JobConf extends Configurati
return val;
}
+ /**
+ * Compute the number of slots required to run a single map task-attempt
+ * of this job.
+ * @param slotSizePerMap cluster-wide value of the amount of memory required
+ * to run a map-task
+ * @return the number of slots required to run a single map task-attempt
+ * 1 if memory parameters are disabled.
+ */
+ int computeNumSlotsPerMap(long slotSizePerMap) {
+ if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
+ (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
+ return 1;
+ }
+ return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
+ }
+
+ /**
+ * Compute the number of slots required to run a single reduce task-attempt
+ * of this job.
+ * @param slotSizePerReduce cluster-wide value of the amount of memory
+ * required to run a reduce-task
+ * @return the number of slots required to run a single reduce task-attempt
+ * 1 if memory parameters are disabled.
+ */
+ int computeNumSlotsPerReduce(long slotSizePerReduce) {
+ if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
+ (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
+ return 1;
+ }
+ return
+ (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
+ }
+
/**
* Find a jar that contains a class of the same name, if any.
* It will return a jar file, even if that is not the first thing