You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/19 17:22:09 UTC
svn commit: r776352 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: yhemanth
Date: Tue May 19 15:21:53 2009
New Revision: 776352
URL: http://svn.apache.org/viewvc?rev=776352&view=rev
Log:
HADOOP-4981. Fix capacity scheduler to schedule speculative tasks correctly in the presence of High RAM jobs. Contributed by Sreekanth Ramakrishnan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=776352&r1=776351&r2=776352&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 19 15:21:53 2009
@@ -690,6 +690,10 @@
HADOOP-5828. Use absolute path for mapred.local.dir of JobTracker in
MiniMRCluster. (yhemanth)
+ HADOOP-4981. Fix capacity scheduler to schedule speculative tasks
+ correctly in the presence of High RAM jobs.
+ (Sreekanth Ramakrishnan via yhemanth)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=776352&r1=776351&r2=776352&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 19 15:21:53 2009
@@ -288,6 +288,15 @@
JobInProgress job) throws IOException;
abstract int getPendingTasks(JobInProgress job);
abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
+ /**
+ * To check if job has a speculative task on the particular tracker.
+ *
+ * @param job job to check for speculative tasks.
+ * @param tts task tracker on which speculative task would run.
+ * @return true if there is a speculative task to run on the tracker.
+ */
+ abstract boolean hasSpeculativeTask(JobInProgress job,
+ TaskTrackerStatus tts);
/**
* List of QSIs for assigning tasks.
@@ -400,29 +409,32 @@
// check if the job's user is over limit
if (isUserOverLimit(j.getProfile().getUser(), qsi)) {
continue;
- }
- if (getPendingTasks(j) != 0) {
- // Not accurate TODO:
- // check if the job's memory requirements are met
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
- // We found a suitable job. Get task from it.
- Task t = obtainNewTask(taskTracker, j);
- if (t != null) {
- // we're successful in getting a task
- return TaskLookupResult.getTaskFoundResult(t);
- }
+ }
+ //If this job meets memory requirements. Ask the JobInProgress for
+ //a task to be scheduled on the task tracker.
+ //if we find a job then we pass it on.
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+ // We found a suitable job. Get task from it.
+ Task t = obtainNewTask(taskTracker, j);
+ //if there is a task return it immediately.
+ if (t != null) {
+ // we're successful in getting a task
+ return TaskLookupResult.getTaskFoundResult(t);
+ } else {
+ //skip to the next job in the queue.
+ continue;
}
- else {
- // mem requirements not met or could not be computed for this TT
- // Rather than look at the next job,
- // we return nothing to the TT, with the hope that we improve
- // chances of finding a suitable TT for this job. This lets us
- // avoid starving jobs with high mem requirements.
+ } else {
+ //if memory requirements don't match then we check if the
+ //job has either pending or speculative task. If the job
+ //has pending or speculative task we block till this job
+ //tasks get scheduled. So that high memory jobs are not starved
+ if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
return TaskLookupResult.getMemFailedResult();
- }
- }
+ }
+ }//end of memory check block
// if we're here, this job has no task to run. Look at the next job.
- }
+ }//end of for loop
// if we're here, we haven't found any task to run among all jobs in
// the queue. This could be because there is nothing to run, or that
@@ -444,24 +456,28 @@
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
- if (getPendingTasks(j) != 0) {
- // Not accurate TODO:
- // check if the job's memory requirements are met
- if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
- // We found a suitable job. Get task from it.
- Task t = obtainNewTask(taskTracker, j);
- if (t != null) {
- // we're successful in getting a task
- return TaskLookupResult.getTaskFoundResult(t);
- }
+ if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+ // We found a suitable job. Get task from it.
+ Task t = obtainNewTask(taskTracker, j);
+ //if there is a task return it immediately.
+ if (t != null) {
+ // we're successful in getting a task
+ return TaskLookupResult.getTaskFoundResult(t);
+ } else {
+ //skip to the next job in the queue.
+ continue;
}
- else {
- // mem requirements not met.
+ } else {
+ //if memory requirements don't match then we check if the
+ //job has either pending or speculative task. If the job
+ //has pending or speculative task we block till this job
+ //tasks get scheduled, so that high memory jobs are not
+ //starved
+ if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
return TaskLookupResult.getMemFailedResult();
- }
- }
- // if we're here, this job has no task to run. Look at the next job.
- }
+ }
+ }//end of memory check block
+ }//end of for loop
// found nothing for this queue, look at the next one.
String msg = "Found no task from the queue " + qsi.queueName;
@@ -516,6 +532,27 @@
LOG.debug(s);
}
+ /**
+ * Check if one of the tasks have a speculative task to execute on the
+ * particular task tracker.
+ *
+ * @param tips tasks of a job
+ * @param progress percentage progress of the job
+ * @param tts task tracker status for which we are asking speculative tip
+ * @return true if job has a speculative task to run on particular TT.
+ */
+ boolean hasSpeculativeTask(TaskInProgress[] tips, float progress,
+ TaskTrackerStatus tts) {
+ long currentTime = System.currentTimeMillis();
+ for(TaskInProgress tip : tips) {
+ if(tip.isRunning()
+ && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName()))
+ && tip.hasSpeculativeTask(currentTime, progress)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
/**
@@ -549,6 +586,15 @@
return qsi.mapTSI;
}
+ @Override
+ boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+ //Check if job supports speculative map execution first then
+ //check if job has speculative maps.
+ return (job.getJobConf().getMapSpeculativeExecution())&& (
+ hasSpeculativeTask(job.getMapTasks(),
+ job.getStatus().mapProgress(), tts));
+ }
+
}
/**
@@ -581,6 +627,16 @@
TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
return qsi.reduceTSI;
}
+
+ @Override
+ boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+ //check if the job supports reduce speculative execution first then
+ //check if the job has speculative tasks.
+ return (job.getJobConf().getReduceSpeculativeExecution()) && (
+ hasSpeculativeTask(job.getReduceTasks(),
+ job.getStatus().reduceProgress(), tts));
+ }
+
}
/** the scheduling mgrs for Map and Reduce tasks */
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=776352&r1=776351&r2=776352&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 19 15:21:53 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.conf.Configuration;
+
public class TestCapacityScheduler extends TestCase {
static final Log LOG =
@@ -145,17 +146,24 @@
}
private ControlledInitializationPoller controlledInitializationPoller;
-
+ /*
+ * Fake job in progress object used for testing the schedulers scheduling
+ * decisions. The JobInProgress objects returns out FakeTaskInProgress
+ * objects when assignTasks is called. If speculative maps and reduces
+ * are configured then JobInProgress returns exactly one Speculative
+ * map and reduce task.
+ */
static class FakeJobInProgress extends JobInProgress {
- private FakeTaskTrackerManager taskTrackerManager;
+ protected FakeTaskTrackerManager taskTrackerManager;
private int mapTaskCtr;
private int redTaskCtr;
private Set<TaskInProgress> mapTips =
new HashSet<TaskInProgress>();
private Set<TaskInProgress> reduceTips =
new HashSet<TaskInProgress>();
-
+ private int speculativeMapTaskCounter = 0;
+ private int speculativeReduceTaskCounter = 0;
public FakeJobInProgress(JobID jId, JobConf jobConf,
FakeTaskTrackerManager taskTrackerManager, String user) {
super(jId, jobConf);
@@ -186,8 +194,14 @@
@Override
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
- if (mapTaskCtr == numMapTasks) return null;
- TaskAttemptID attemptId = getTaskAttemptID(true);
+ boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
+ if (areAllMapsRunning){
+ if(!getJobConf().getMapSpeculativeExecution() ||
+ speculativeMapTasks > 0) {
+ return null;
+ }
+ }
+ TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
@Override
public String toString() {
@@ -197,16 +211,39 @@
taskTrackerManager.startTask(tts.getTrackerName(), task);
runningMapTasks++;
// create a fake TIP and keep track of it
- mapTips.add(new FakeTaskInProgress(getJobID(),
- getJobConf(), task, true, this));
+ FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(),
+ getJobConf(), task, true, this);
+ mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if(areAllMapsRunning) {
+ speculativeMapTasks++;
+ //you have scheduled a speculative map. Now set all tips in the
+ //map tips not to have speculative task.
+ for(TaskInProgress t : mapTips) {
+ if (t instanceof FakeTaskInProgress) {
+ FakeTaskInProgress mt = (FakeTaskInProgress) t;
+ mt.hasSpeculativeMap = false;
+ }
+ }
+ } else {
+ //add only non-speculative tips.
+ mapTips.add(mapTip);
+ //add the tips to the JobInProgress TIPS
+ maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
+ }
return task;
}
@Override
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
- if (redTaskCtr == numReduceTasks) return null;
- TaskAttemptID attemptId = getTaskAttemptID(false);
+ boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
+ if (areAllReducesRunning){
+ if(!getJobConf().getReduceSpeculativeExecution() ||
+ speculativeReduceTasks > 0) {
+ return null;
+ }
+ }
+ TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
Task task = new ReduceTask("", attemptId, 0, 10) {
@Override
public String toString() {
@@ -216,8 +253,25 @@
taskTrackerManager.startTask(tts.getTrackerName(), task);
runningReduceTasks++;
// create a fake TIP and keep track of it
- reduceTips.add(new FakeTaskInProgress(getJobID(),
- getJobConf(), task, false, this));
+ FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(),
+ getJobConf(), task, false, this);
+ reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if(areAllReducesRunning) {
+ speculativeReduceTasks++;
+ //you have scheduled a speculative map. Now set all tips in the
+ //map tips not to have speculative task.
+ for(TaskInProgress t : reduceTips) {
+ if (t instanceof FakeTaskInProgress) {
+ FakeTaskInProgress rt = (FakeTaskInProgress) t;
+ rt.hasSpeculativeReduce = false;
+ }
+ }
+ } else {
+ //add only non-speculative tips.
+ reduceTips.add(reduceTip);
+ //add the tips to the JobInProgress TIPS
+ reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
+ }
return task;
}
@@ -231,14 +285,19 @@
finishedReduceTasks++;
}
- private TaskAttemptID getTaskAttemptID(boolean isMap) {
+ private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
JobID jobId = getJobID();
TaskType t = TaskType.REDUCE;
if (isMap) {
t = TaskType.MAP;
}
- return new TaskAttemptID(jobId.getJtIdentifier(),
- jobId.getId(), t, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
+ if (!isSpeculative) {
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+ (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
+ } else {
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+ (isMap) ? mapTaskCtr : redTaskCtr, 1);
+ }
}
@Override
@@ -269,12 +328,15 @@
this.status.setRunState(JobStatus.FAILED);
}
}
-
+
static class FakeTaskInProgress extends TaskInProgress {
private boolean isMap;
private FakeJobInProgress fakeJob;
private TreeMap<TaskAttemptID, String> activeTasks;
private TaskStatus taskStatus;
+ boolean hasSpeculativeMap;
+ boolean hasSpeculativeReduce;
+
FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
boolean isMap, FakeJobInProgress job) {
super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
@@ -286,6 +348,16 @@
this.taskStatus = TaskStatus.createTaskStatus(isMap);
taskStatus.setProgress(0.5f);
taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if (jobConf.getMapSpeculativeExecution()) {
+ //resetting of the hasSpeculativeMap is done
+ //when speculative map is scheduled by the job.
+ hasSpeculativeMap = true;
+ }
+ if (jobConf.getReduceSpeculativeExecution()) {
+ //resetting of the hasSpeculativeReduce is done
+ //when speculative reduce is scheduled by the job.
+ hasSpeculativeReduce = true;
+ }
}
@Override
@@ -307,6 +379,27 @@
}
return true;
}
+
+ @Override
+ /*
+ *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
+ *after the speculative tip has been scheduled.
+ */
+ boolean hasSpeculativeTask(long currentTime, double averageProgress) {
+ if(isMap && hasSpeculativeMap) {
+ return fakeJob.getJobConf().getMapSpeculativeExecution();
+ }
+ if (!isMap && hasSpeculativeReduce) {
+ return fakeJob.getJobConf().getReduceSpeculativeExecution();
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !activeTasks.isEmpty();
+ }
+
}
static class FakeQueueManager extends QueueManager {
@@ -625,6 +718,9 @@
resConf.getQueues());
scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(conf);
+ //by default disable speculative execution.
+ conf.setMapSpeculativeExecution(false);
+ conf.setReduceSpeculativeExecution(false);
}
@Override
@@ -1533,6 +1629,8 @@
jConf.setNumReduceTasks(1);
jConf.setQueueName("default");
jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(false);
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
@@ -1551,6 +1649,8 @@
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(false);
submitJobAndInit(JobStatus.PREP, jConf); // job2
// This job shouldn't run the TT now because of lack of pmem
@@ -2147,6 +2247,269 @@
assertFalse("Waiting job contains submitted job",
mgr.getRunningJobQueue("default").contains(job));
}
+
+ /**
+ * Test case deals with normal jobs which have speculative maps and reduce.
+ * Following is test executed
+ * <ol>
+ * <li>Submit one job with speculative maps and reduce.</li>
+ * <li>Submit another job with no speculative execution.</li>
+ * <li>Observe that all tasks from first job get scheduled, speculative
+ * and normal tasks</li>
+ * <li>Finish all the first jobs tasks second jobs tasks get scheduled.</li>
+ * </ol>
+ * @throws IOException
+ */
+ public void testSpeculativeTaskScheduling() throws IOException {
+ String[] qs = {"default"};
+ taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ JobConf conf = new JobConf();
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+ conf.setMapSpeculativeExecution(true);
+ conf.setReduceSpeculativeExecution(true);
+ //Submit a job which would have one speculative map and one speculative
+ //reduce.
+ FakeJobInProgress fjob1 = submitJob(JobStatus.PREP, conf);
+
+ conf = new JobConf();
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+ //Submit a job which has no speculative map or reduce.
+ FakeJobInProgress fjob2 = submitJob(JobStatus.PREP, conf);
+
+ //Ask the poller to initalize all the submitted job and raise status
+ //change event.
+ controlledInitializationPoller.selectJobsToInitialize();
+ raiseStatusChangeEvents(mgr);
+
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ assertTrue("Pending maps of job1 greater than zero",
+ (fjob1.pendingMaps() == 0));
+ checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ assertTrue("Pending reduces of job2 greater than zero",
+ (fjob1.pendingReduces() == 0));
+ checkAssignment("tt2", "attempt_test_0001_r_000001_1 on tt2");
+
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", fjob1);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
+ taskTrackerManager.finalizeJob(fjob1);
+
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
+ taskTrackerManager.finalizeJob(fjob2);
+
+ }
+ /**
+ * Test case to test scheduling of
+ * <ol>
+ * <li>High ram job with speculative map execution.
+ * <ul>
+ * <li>Submit one high ram job which has speculative map.</li>
+ * <li>Submit a normal job which has no speculative map.</li>
+ * <li>Scheduler should schedule first all map tasks from first job and block
+ * the cluster till both maps from first job get completed.
+ * </ul>
+ * </li>
+ * <li>High ram job with speculative reduce execution.
+ * <ul>
+ * <li>Submit one high ram job which has speculative reduce.</li>
+ * <li>Submit a normal job which has no speculative reduce.</li>
+ * <li>Scheduler should schedule first all reduce tasks from first job and block
+ * the cluster till both reduces are completed.</li>
+ * </ul>
+ * </li>
+ * </ol>
+ * @throws IOException
+ */
+ public void testHighRamJobWithSpeculativeExecution() throws IOException {
+ // 2 map and 2 reduce slots
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
+
+ //task tracker memory configurations.
+ TaskTrackerStatus.ResourceStatus ttStatus =
+ taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+ ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+ ttStatus.setReservedVirtualMemory(0);
+ ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
+ ttStatus.setReservedPhysicalMemory(0);
+ ttStatus = taskTrackerManager.getTaskTracker("tt2").getResourceStatus();
+ ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+ ttStatus.setReservedVirtualMemory(0);
+ ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
+ ttStatus.setReservedPhysicalMemory(0);
+
+
+ taskTrackerManager.addQueues(new String[] { "default" });
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ // enabled memory-based scheduling
+ scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+ 1 * 1024 * 1024 * 1024L);
+ scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+ 3 * 1024 * 1024 * 1024L);
+ resConf.setDefaultPercentOfPmemInVmem(33.3f);
+ resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+
+ JobConf jConf = new JobConf();
+ jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
+ jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(true);
+ jConf.setReduceSpeculativeExecution(false);
+ FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
+ jConf, taskTrackerManager,"u1");
+
+ //Submit a high memory job with speculative tasks.
+ taskTrackerManager.submitJob(job1);
+
+ jConf = new JobConf();
+ jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
+ jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(false);
+ //Submit normal job
+ FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
+
+ controlledInitializationPoller.selectJobsToInitialize();
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
+
+ // first, a map from j1 will run
+ // at this point, there is a speculative task for the same job to be
+ //scheduled. This task would be scheduled. Till the tasks from job1 gets
+ //complete none of the tasks from other jobs would be scheduled.
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
+ //make same tracker get back, check if you are blocking. Your job
+ //has speculative map task so tracker should be blocked even tho' it
+ //can run job2's map.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ //TT2 now gets speculative map of the job1
+ checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+
+ // Now since the first job has no more speculative maps, it can schedule
+ // the second job.
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+ //finish everything
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0",
+ job1);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1",
+ job1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0",
+ job2);
+ taskTrackerManager.finalizeJob(job1);
+ taskTrackerManager.finalizeJob(job2);
+
+ //Now submit high ram job with speculative reduce and check.
+ jConf = new JobConf();
+ jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
+ jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(true);
+ FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
+ jConf, taskTrackerManager,"u1");
+
+ //Submit a high memory job with speculative reduce tasks.
+ taskTrackerManager.submitJob(job3);
+
+ jConf = new JobConf();
+ jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
+ jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ jConf.setMapSpeculativeExecution(false);
+ jConf.setReduceSpeculativeExecution(false);
+ //Submit normal job
+ FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
+
+ controlledInitializationPoller.selectJobsToInitialize();
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
+ //all maps of jobs get assigned to same task tracker as
+ //job does not have speculative map and same tracker sends two heart
+ //beat back to back.
+ checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+ //first map slot gets attention on this tracker.
+ checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+ //now first reduce of the job3 would be scheduled on tt2 since it has
+ //memory.
+ //assigntasks() would check for free reduce slot is greater than
+ //map slots. Seeing there is more free reduce slot it would try scheduling
+ //reduce of job1 but would block as in it is a high memory task.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ //TT2 would get the reduce task from high memory job as the tt is running
+ //normal jobs map. which is low mem.
+ checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
+ // now if either TT comes back, it will block because all maps
+ // are done, and the first jobs reduce has a speculative task.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ //finish maps.
+ taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0",
+ job3);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0004_m_000001_0",
+ job4);
+ //check speculative reduce code path is covered.
+ assertEquals("Pending reduces not zero for high " +
+ "ram job with speculative reduce.", 0, job3.pendingReduces());
+ //if tt2 returns back it is not given any task even if it can schedule
+ //job2 reduce.
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ //speculative reduce of the job3 would be scheduled.
+ checkAssignment("tt1", "attempt_test_0003_r_000001_1 on tt1");
+ //now both speculative and actual task have been scheduled for job3.
+ //Normal task of Job4 would now be scheduled on TT1 as it has free space
+ //to run.
+ checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+ //No more tasks.
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+
+ //finish all the reduces.
+ taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_1",
+ job3);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0003_r_000001_0",
+ job3);
+ //finish the job
+ taskTrackerManager.finalizeJob(job3);
+ //finish the task and the job.
+ taskTrackerManager.finishTask("tt1", "attempt_test_0004_r_000001_0",
+ job4);
+ taskTrackerManager.finalizeJob(job4);
+
+ }
private void checkRunningJobMovementAndCompletion() throws IOException {