You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/08/27 09:49:01 UTC
svn commit: r808308 [4/5] - in /hadoop/mapreduce/trunk: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Aug 27 07:49:00 2009
@@ -18,755 +18,27 @@
package org.apache.hadoop.mapred;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
import junit.framework.TestCase;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobInProgress;
-import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil.AccessControlList;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
+
+import java.io.IOException;
+import java.util.*;
public class TestCapacityScheduler extends TestCase {
static final Log LOG =
- LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
+ LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
private static int jobCounter;
- /**
- * Test class that removes the asynchronous nature of job initialization.
- *
- * The run method is a dummy which just waits for completion. It is
- * expected that test code calls the main method, initializeJobs, directly
- * to trigger initialization.
- */
- class ControlledJobInitializer extends
- JobInitializationPoller.JobInitializationThread {
-
- boolean stopRunning;
-
- public ControlledJobInitializer(JobInitializationPoller p) {
- p.super();
- }
-
- @Override
- public void run() {
- while (!stopRunning) {
- try {
- synchronized(this) {
- this.wait();
- }
- } catch (InterruptedException ie) {
- break;
- }
- }
- }
-
- void stopRunning() {
- stopRunning = true;
- }
- }
-
- /**
- * Test class that removes the asynchronous nature of job initialization.
- *
- * The run method is a dummy which just waits for completion. It is
- * expected that test code calls the main method, selectJobsToInitialize,
- * directly to trigger initialization.
- *
- * The class also creates the test worker thread objects of type
- * ControlledJobInitializer instead of the objects of the actual class
- */
- class ControlledInitializationPoller extends JobInitializationPoller {
-
- private boolean stopRunning;
- private ArrayList<ControlledJobInitializer> workers;
-
- public ControlledInitializationPoller(JobQueuesManager mgr,
- CapacitySchedulerConf rmConf,
- Set<String> queues,
- TaskTrackerManager ttm) {
- super(mgr, rmConf, queues, ttm);
- }
-
- @Override
- public void run() {
- // don't do anything here.
- while (!stopRunning) {
- try {
- synchronized (this) {
- this.wait();
- }
- } catch (InterruptedException ie) {
- break;
- }
- }
- }
-
- @Override
- JobInitializationThread createJobInitializationThread() {
- ControlledJobInitializer t = new ControlledJobInitializer(this);
- if (workers == null) {
- workers = new ArrayList<ControlledJobInitializer>();
- }
- workers.add(t);
- return t;
- }
-
- @Override
- void selectJobsToInitialize() {
- super.cleanUpInitializedJobsList();
- super.selectJobsToInitialize();
- for (ControlledJobInitializer t : workers) {
- t.initializeJobs();
- }
- }
-
- void stopRunning() {
- stopRunning = true;
- for (ControlledJobInitializer t : workers) {
- t.stopRunning();
- t.interrupt();
- }
- }
- }
-
private ControlledInitializationPoller controlledInitializationPoller;
- /*
- * Fake job in progress object used for testing the schedulers scheduling
- * decisions. The JobInProgress objects returns out FakeTaskInProgress
- * objects when assignTasks is called. If speculative maps and reduces
- * are configured then JobInProgress returns exactly one Speculative
- * map and reduce task.
- */
- static class FakeJobInProgress extends JobInProgress {
-
- protected FakeTaskTrackerManager taskTrackerManager;
- private int mapTaskCtr;
- private int redTaskCtr;
- private Set<TaskInProgress> mapTips =
- new HashSet<TaskInProgress>();
- private Set<TaskInProgress> reduceTips =
- new HashSet<TaskInProgress>();
- private int speculativeMapTaskCounter = 0;
- private int speculativeReduceTaskCounter = 0;
- public FakeJobInProgress(JobID jId, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(jId, jobConf, null);
- this.taskTrackerManager = taskTrackerManager;
- this.startTime = System.currentTimeMillis();
- this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP,
- jobConf.getUser(),
- jobConf.getJobName(), "", "");
- this.status.setJobPriority(JobPriority.NORMAL);
- this.status.setStartTime(startTime);
- if (null == jobConf.getQueueName()) {
- this.profile = new JobProfile(user, jId,
- null, null, null);
- }
- else {
- this.profile = new JobProfile(user, jId,
- null, null, null, jobConf.getQueueName());
- }
- mapTaskCtr = 0;
- redTaskCtr = 0;
- }
-
- @Override
- public synchronized void initTasks() throws IOException {
- getStatus().setRunState(JobStatus.RUNNING);
- }
-
- @Override
- public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
- int ignored) throws IOException {
- boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
- if (areAllMapsRunning){
- if(!getJobConf().getMapSpeculativeExecution() ||
- speculativeMapTasks > 0) {
- return null;
- }
- }
- TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(),
- super.numSlotsPerMap) {
- @Override
- public String toString() {
- return String.format("%s on %s", getTaskID(), tts.getTrackerName());
- }
- };
- taskTrackerManager.startTask(tts.getTrackerName(), task);
- runningMapTasks++;
- // create a fake TIP and keep track of it
- FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(),
- getJobConf(), task, true, this);
- mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
- if(areAllMapsRunning) {
- speculativeMapTasks++;
- //you have scheduled a speculative map. Now set all tips in the
- //map tips not to have speculative task.
- for(TaskInProgress t : mapTips) {
- if (t instanceof FakeTaskInProgress) {
- FakeTaskInProgress mt = (FakeTaskInProgress) t;
- mt.hasSpeculativeMap = false;
- }
- }
- } else {
- //add only non-speculative tips.
- mapTips.add(mapTip);
- //add the tips to the JobInProgress TIPS
- maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
- }
- return task;
- }
-
- @Override
- public Task obtainNewReduceTask(final TaskTrackerStatus tts,
- int clusterSize, int ignored) throws IOException {
- boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
- if (areAllReducesRunning){
- if(!getJobConf().getReduceSpeculativeExecution() ||
- speculativeReduceTasks > 0) {
- return null;
- }
- }
- TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
- Task task = new ReduceTask("", attemptId, 0, 10,
- super.numSlotsPerReduce) {
- @Override
- public String toString() {
- return String.format("%s on %s", getTaskID(), tts.getTrackerName());
- }
- };
- taskTrackerManager.startTask(tts.getTrackerName(), task);
- runningReduceTasks++;
- // create a fake TIP and keep track of it
- FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(),
- getJobConf(), task, false, this);
- reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
- if(areAllReducesRunning) {
- speculativeReduceTasks++;
- //you have scheduled a speculative map. Now set all tips in the
- //map tips not to have speculative task.
- for(TaskInProgress t : reduceTips) {
- if (t instanceof FakeTaskInProgress) {
- FakeTaskInProgress rt = (FakeTaskInProgress) t;
- rt.hasSpeculativeReduce = false;
- }
- }
- } else {
- //add only non-speculative tips.
- reduceTips.add(reduceTip);
- //add the tips to the JobInProgress TIPS
- reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
- }
- return task;
- }
-
- public void mapTaskFinished() {
- runningMapTasks--;
- finishedMapTasks++;
- }
-
- public void reduceTaskFinished() {
- runningReduceTasks--;
- finishedReduceTasks++;
- }
-
- private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
- JobID jobId = getJobID();
- TaskType t = TaskType.REDUCE;
- if (isMap) {
- t = TaskType.MAP;
- }
- if (!isSpeculative) {
- return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
- (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
- } else {
- return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
- (isMap) ? mapTaskCtr : redTaskCtr, 1);
- }
- }
-
- @Override
- Set<TaskInProgress> getNonLocalRunningMaps() {
- return (Set<TaskInProgress>)mapTips;
- }
- @Override
- Set<TaskInProgress> getRunningReduces() {
- return (Set<TaskInProgress>)reduceTips;
- }
-
- }
-
- static class FakeFailingJobInProgress extends FakeJobInProgress {
-
- public FakeFailingJobInProgress(JobID id, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(id, jobConf, taskTrackerManager, user);
- }
-
- @Override
- public synchronized void initTasks() throws IOException {
- throw new IOException("Failed Initalization");
- }
-
- @Override
- synchronized void fail() {
- this.status.setRunState(JobStatus.FAILED);
- }
- }
-
- static class FakeTaskInProgress extends TaskInProgress {
- private boolean isMap;
- private FakeJobInProgress fakeJob;
- private TreeMap<TaskAttemptID, String> activeTasks;
- private TaskStatus taskStatus;
- boolean hasSpeculativeMap;
- boolean hasSpeculativeReduce;
-
- FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
- boolean isMap, FakeJobInProgress job) {
- super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
- this.isMap = isMap;
- this.fakeJob = job;
- activeTasks = new TreeMap<TaskAttemptID, String>();
- activeTasks.put(t.getTaskID(), "tt");
- // create a fake status for a task that is running for a bit
- this.taskStatus = TaskStatus.createTaskStatus(isMap);
- taskStatus.setProgress(0.5f);
- taskStatus.setRunState(TaskStatus.State.RUNNING);
- if (jobConf.getMapSpeculativeExecution()) {
- //resetting of the hasSpeculativeMap is done
- //when speculative map is scheduled by the job.
- hasSpeculativeMap = true;
- }
- if (jobConf.getReduceSpeculativeExecution()) {
- //resetting of the hasSpeculativeReduce is done
- //when speculative reduce is scheduled by the job.
- hasSpeculativeReduce = true;
- }
- }
-
- @Override
- TreeMap<TaskAttemptID, String> getActiveTasks() {
- return activeTasks;
- }
- @Override
- public TaskStatus getTaskStatus(TaskAttemptID taskid) {
- // return a status for a task that has run a bit
- return taskStatus;
- }
- @Override
- boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
- if (isMap) {
- fakeJob.mapTaskFinished();
- }
- else {
- fakeJob.reduceTaskFinished();
- }
- return true;
- }
-
- @Override
- /*
- *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
- *after the speculative tip has been scheduled.
- */
- boolean canBeSpeculated(long currentTime) {
- if(isMap && hasSpeculativeMap) {
- return fakeJob.getJobConf().getMapSpeculativeExecution();
- }
- if (!isMap && hasSpeculativeReduce) {
- return fakeJob.getJobConf().getReduceSpeculativeExecution();
- }
- return false;
- }
-
- @Override
- public boolean isRunning() {
- return !activeTasks.isEmpty();
- }
-
- }
-
- static class FakeQueueManager extends QueueManager {
- private Set<String> queueNames = null;
- private static final AccessControlList allEnabledAcl = new AccessControlList("*");
-
- FakeQueueManager() {
- super(new Configuration());
- }
-
- void setQueues(Set<String> queueNames) {
- this.queueNames = queueNames;
-
- // sync up queues with the parent class.
- Queue[] queues = new Queue[queueNames.size()];
- int i = 0;
- for (String queueName : queueNames) {
- HashMap<String, AccessControlList> aclsMap
- = new HashMap<String, AccessControlList>();
- for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
- String key = QueueManager.toFullPropertyName(queueName,
- oper.getAclName());
- aclsMap.put(key, allEnabledAcl);
- }
- queues[i++] = new Queue(queueName, aclsMap, Queue.QueueState.RUNNING);
- }
- super.setQueues(queues);
- }
-
- public synchronized Set<String> getQueues() {
- return queueNames;
- }
- }
-
- static class FakeTaskTrackerManager implements TaskTrackerManager {
- int maps = 0;
- int reduces = 0;
- int maxMapTasksPerTracker = 2;
- int maxReduceTasksPerTracker = 1;
- long ttExpiryInterval = 10 * 60 * 1000L; // default interval
- List<JobInProgressListener> listeners =
- new ArrayList<JobInProgressListener>();
- FakeQueueManager qm = new FakeQueueManager();
-
- private Map<String, TaskTracker> trackers =
- new HashMap<String, TaskTracker>();
- private Map<String, TaskStatus> taskStatuses =
- new HashMap<String, TaskStatus>();
- private Map<JobID, JobInProgress> jobs =
- new HashMap<JobID, JobInProgress>();
-
- public FakeTaskTrackerManager() {
- this(2, 2, 1);
- }
-
- public FakeTaskTrackerManager(int numTaskTrackers,
- int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
- this.maxMapTasksPerTracker = maxMapTasksPerTracker;
- this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
- for (int i = 1; i < numTaskTrackers + 1; i++) {
- String ttName = "tt" + i;
- TaskTracker tt = new TaskTracker(ttName);
- tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker,
- maxReduceTasksPerTracker));
- trackers.put(ttName, tt);
- }
- }
-
- public void addTaskTracker(String ttName) {
- TaskTracker tt = new TaskTracker(ttName);
- tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker,
- maxReduceTasksPerTracker));
- trackers.put(ttName, tt);
- }
-
- public ClusterStatus getClusterStatus() {
- int numTrackers = trackers.size();
- return new ClusterStatus(numTrackers, 0,
- ttExpiryInterval, maps, reduces,
- numTrackers * maxMapTasksPerTracker,
- numTrackers * maxReduceTasksPerTracker,
- JobTracker.State.RUNNING);
- }
-
- public int getNumberOfUniqueHosts() {
- return 0;
- }
-
- public int getNextHeartbeatInterval() {
- return MRConstants.HEARTBEAT_INTERVAL_MIN;
- }
-
- @Override
- public void killJob(JobID jobid) throws IOException {
- JobInProgress job = jobs.get(jobid);
- finalizeJob(job, JobStatus.KILLED);
- job.kill();
- }
-
- public void initJob(JobInProgress jip) {
- try {
- JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
- jip.initTasks();
- if (jip.isJobEmpty()) {
- completeEmptyJob(jip);
- } else if (!jip.isSetupCleanupRequired()) {
- jip.completeSetup();
- }
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- JobStatusChangeEvent event = new JobStatusChangeEvent(jip,
- EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- } catch (Exception ioe) {
- failJob(jip);
- }
- }
-
- private synchronized void completeEmptyJob(JobInProgress jip) {
- jip.completeEmptyJob();
- }
-
- public synchronized void failJob(JobInProgress jip) {
- JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
- jip.fail();
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- JobStatusChangeEvent event = new JobStatusChangeEvent(jip,
- EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- public void removeJob(JobID jobid) {
- jobs.remove(jobid);
- }
-
- @Override
- public JobInProgress getJob(JobID jobid) {
- return jobs.get(jobid);
- }
-
- Collection<JobInProgress> getJobs() {
- return jobs.values();
- }
-
- public Collection<TaskTrackerStatus> taskTrackers() {
- List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
- for (TaskTracker tt : trackers.values()) {
- statuses.add(tt.getStatus());
- }
- return statuses;
- }
-
-
- public void addJobInProgressListener(JobInProgressListener listener) {
- listeners.add(listener);
- }
-
- public void removeJobInProgressListener(JobInProgressListener listener) {
- listeners.remove(listener);
- }
-
- public void submitJob(JobInProgress job) throws IOException {
- jobs.put(job.getJobID(), job);
- for (JobInProgressListener listener : listeners) {
- listener.jobAdded(job);
- }
- }
-
- public TaskTracker getTaskTracker(String trackerID) {
- return trackers.get(trackerID);
- }
-
- public void startTask(String taskTrackerName, final Task t) {
- if (t.isMapTask()) {
- maps++;
- } else {
- reduces++;
- }
- TaskStatus status = new TaskStatus() {
- @Override
- public TaskAttemptID getTaskID() {
- return t.getTaskID();
- }
-
- @Override
- public boolean getIsMap() {
- return t.isMapTask();
- }
-
- @Override
- public int getNumSlots() {
- return t.getNumSlotsRequired();
- }
- };
- taskStatuses.put(t.getTaskID().toString(), status);
- status.setRunState(TaskStatus.State.RUNNING);
- trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
- }
-
- public void finishTask(String taskTrackerName, String tipId,
- FakeJobInProgress j) {
- TaskStatus status = taskStatuses.get(tipId);
- if (status.getIsMap()) {
- maps--;
- j.mapTaskFinished();
- } else {
- reduces--;
- j.reduceTaskFinished();
- }
- status.setRunState(TaskStatus.State.SUCCEEDED);
- }
-
- void finalizeJob(FakeJobInProgress fjob) {
- finalizeJob(fjob, JobStatus.SUCCEEDED);
- }
-
- void finalizeJob(JobInProgress fjob, int state) {
- // take a snapshot of the status before changing it
- JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
- fjob.getStatus().setRunState(state);
- JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
- JobStatusChangeEvent event =
- new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus,
- newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
- // take a snapshot of the status before changing it
- JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
- fjob.setPriority(priority);
- JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
- JobStatusChangeEvent event =
- new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus,
- newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- public void setStartTime(FakeJobInProgress fjob, long start) {
- // take a snapshot of the status before changing it
- JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-
- fjob.startTime = start; // change the start time of the job
- fjob.status.setStartTime(start); // change the start time of the jobstatus
-
- JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-
- JobStatusChangeEvent event =
- new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
- newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- void addQueues(String[] arr) {
- Set<String> queues = new HashSet<String>();
- for (String s: arr) {
- queues.add(s);
- }
- qm.setQueues(queues);
- }
-
- public QueueManager getQueueManager() {
- return qm;
- }
-
- @Override
- public boolean killTask(TaskAttemptID taskid, boolean shouldFail) {
- return true;
- }
- }
-
- // represents a fake queue configuration info
- static class FakeQueueInfo {
- String queueName;
- float capacity;
- boolean supportsPrio;
- int ulMin;
-
- public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
- this.queueName = queueName;
- this.capacity = capacity;
- this.supportsPrio = supportsPrio;
- this.ulMin = ulMin;
- }
- }
-
- static class FakeResourceManagerConf extends CapacitySchedulerConf {
-
- // map of queue names to queue info
- private Map<String, FakeQueueInfo> queueMap =
- new LinkedHashMap<String, FakeQueueInfo>();
- String firstQueue;
-
-
- void setFakeQueues(List<FakeQueueInfo> queues) {
- for (FakeQueueInfo q: queues) {
- queueMap.put(q.queueName, q);
- }
- firstQueue = new String(queues.get(0).queueName);
- }
-
- public synchronized Set<String> getQueues() {
- return queueMap.keySet();
- }
-
- /*public synchronized String getFirstQueue() {
- return firstQueue;
- }*/
-
- public float getCapacity(String queue) {
- if(queueMap.get(queue).capacity == -1) {
- return super.getCapacity(queue);
- }
- return queueMap.get(queue).capacity;
- }
-
- public int getMinimumUserLimitPercent(String queue) {
- return queueMap.get(queue).ulMin;
- }
-
- public boolean isPrioritySupported(String queue) {
- return queueMap.get(queue).supportsPrio;
- }
-
- @Override
- public long getSleepInterval() {
- return 1;
- }
-
- @Override
- public int getMaxWorkerThreads() {
- return 1;
- }
- }
-
- protected class FakeClock extends CapacityTaskScheduler.Clock {
- private long time = 0;
-
- public void advance(long millis) {
- time += millis;
- }
- @Override
- long getTime() {
- return time;
- }
- }
-
protected JobConf conf;
protected CapacityTaskScheduler scheduler;
private FakeTaskTrackerManager taskTrackerManager;
@@ -778,12 +50,14 @@
setUp(2, 2, 1);
}
- private void setUp(int numTaskTrackers, int numMapTasksPerTracker,
- int numReduceTasksPerTracker) {
+ private void setUp(
+ int numTaskTrackers, int numMapTasksPerTracker,
+ int numReduceTasksPerTracker) {
jobCounter = 0;
taskTrackerManager =
- new FakeTaskTrackerManager(numTaskTrackers, numMapTasksPerTracker,
- numReduceTasksPerTracker);
+ new FakeTaskTrackerManager(
+ numTaskTrackers, numMapTasksPerTracker,
+ numReduceTasksPerTracker);
clock = new FakeClock();
scheduler = new CapacityTaskScheduler(clock);
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -792,16 +66,16 @@
// Don't let the JobInitializationPoller come in our way.
resConf = new FakeResourceManagerConf();
controlledInitializationPoller = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues(), taskTrackerManager);
+ scheduler.jobQueuesManager,
+ resConf,
+ resConf.getQueues(), taskTrackerManager);
scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(conf);
//by default disable speculative execution.
conf.setMapSpeculativeExecution(false);
conf.setReduceSpeculativeExecution(false);
}
-
+
@Override
protected void tearDown() throws Exception {
if (scheduler != null) {
@@ -809,38 +83,43 @@
}
}
- private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
+ private FakeJobInProgress submitJob(int state, JobConf jobConf)
+ throws IOException {
FakeJobInProgress job =
- new FakeJobInProgress(new JobID("test", ++jobCounter),
- (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
- jobConf.getUser());
+ new FakeJobInProgress(
+ new JobID("test", ++jobCounter),
+ (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
+ jobConf.getUser());
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
return job;
}
private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
- throws IOException {
+ throws IOException {
FakeJobInProgress j = submitJob(state, jobConf);
taskTrackerManager.initJob(j);
return j;
}
- private FakeJobInProgress submitJob(int state, int maps, int reduces,
- String queue, String user) throws IOException {
+ private FakeJobInProgress submitJob(
+ int state, int maps, int reduces,
+ String queue, String user) throws IOException {
JobConf jobConf = new JobConf(conf);
jobConf.setNumMapTasks(maps);
jobConf.setNumReduceTasks(reduces);
- if (queue != null)
+ if (queue != null) {
jobConf.setQueueName(queue);
+ }
jobConf.setUser(user);
return submitJob(state, jobConf);
}
-
+
// Submit a job and update the listeners
- private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
- String queue, String user)
- throws IOException {
+ private FakeJobInProgress submitJobAndInit(
+ int state, int maps, int reduces,
+ String queue, String user)
+ throws IOException {
FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
taskTrackerManager.initJob(j);
return j;
@@ -848,22 +127,23 @@
/**
* Test the max map limit.
+ *
* @throws IOException
*/
public void testMaxMapCap() throws IOException {
- this.setUp(4,1,1);
- taskTrackerManager.addQueues(new String[] {"default"});
+ this.setUp(4, 1, 1);
+ taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("default",2);
- resConf.setMaxReduceCap("default",-1);
+ resConf.setMaxMapCap("default", 2);
+ resConf.setMaxReduceCap("default", -1);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
//submit the Job
FakeJobInProgress fjob1 =
- submitJob(JobStatus.PREP,3,1,"default","user");
+ submitJob(JobStatus.PREP, 3, 1, "default", "user");
taskTrackerManager.initJob(fjob1);
@@ -871,21 +151,27 @@
List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
//Once the 2 tasks are running the third assigment should be reduce.
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ "attempt_test_0001_r_000001_0 on tt3");
//This should fail.
List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
assertNull(task4);
//Now complete the task 1.
- // complete the job
- taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
- fjob1);
+ // complete the job
+ taskTrackerManager.finishTask(
+ "tt1", task1.get(0).getTaskID().toString(),
+ fjob1);
//We have completed the tt1 task which was a map task so we expect one map
//task to be picked up
- checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0001_m_000003_0 on tt4");
}
/**
* Test max reduce limit
+ *
* @throws IOException
*/
public void testMaxReduceCap() throws IOException {
@@ -927,137 +213,152 @@
"tt2", task2.get(0).getTaskID().toString(), fjob1);
//One reduce is done hence assign the new reduce.
- checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0001_r_000003_0 on tt4");
}
-
+
// test job run-state change
public void testJobRunStateChange() throws IOException {
// start the scheduler
- taskTrackerManager.addQueues(new String[] {"default"});
+ taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
-
+
// submit the job
- FakeJobInProgress fjob1 =
+ FakeJobInProgress fjob1 =
submitJob(JobStatus.PREP, 1, 0, "default", "user");
-
- FakeJobInProgress fjob2 =
+
+ FakeJobInProgress fjob2 =
submitJob(JobStatus.PREP, 1, 0, "default", "user");
-
+
// test if changing the job priority/start-time works as expected in the
// waiting queue
testJobOrderChange(fjob1, fjob2, true);
-
+
// Init the jobs
// simulate the case where the job with a lower priority becomes running
// first (may be because of the setup tasks).
-
+
// init the lower ranked job first
taskTrackerManager.initJob(fjob2);
-
+
// init the higher ordered job later
taskTrackerManager.initJob(fjob1);
-
+
// check if the jobs are missing from the waiting queue
// The jobs are not removed from waiting queue until they are scheduled
- assertEquals("Waiting queue is garbled on job init", 2,
- scheduler.jobQueuesManager.getWaitingJobs("default")
- .size());
-
+ assertEquals(
+ "Waiting queue is garbled on job init", 2,
+ scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+ .size());
+
// test if changing the job priority/start-time works as expected in the
// running queue
testJobOrderChange(fjob1, fjob2, false);
-
+
// schedule a task
List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
-
+
// complete the job
- taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(),
- fjob1);
-
+ taskTrackerManager.finishTask(
+ "tt1", tasks.get(0).getTaskID().toString(),
+ fjob1);
+
// mark the job as complete
taskTrackerManager.finalizeJob(fjob1);
-
- Collection<JobInProgress> rqueue =
- scheduler.jobQueuesManager.getRunningJobQueue("default");
-
+
+ Collection<JobInProgress> rqueue =
+ scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
+
// check if the job is removed from the scheduler
- assertFalse("Scheduler contains completed job",
- rqueue.contains(fjob1));
-
+ assertFalse(
+ "Scheduler contains completed job",
+ rqueue.contains(fjob1));
+
// check if the running queue size is correct
- assertEquals("Job finish garbles the queue",
- 1, rqueue.size());
+ assertEquals(
+ "Job finish garbles the queue",
+ 1, rqueue.size());
}
-
+
// test if the queue reflects the changes
- private void testJobOrderChange(FakeJobInProgress fjob1,
- FakeJobInProgress fjob2,
- boolean waiting) {
+ private void testJobOrderChange(
+ FakeJobInProgress fjob1,
+ FakeJobInProgress fjob2,
+ boolean waiting) {
String queueName = waiting ? "waiting" : "running";
-
+
// check if the jobs in the queue are the right order
JobInProgress[] jobs = getJobsInQueue(waiting);
- assertTrue(queueName + " queue doesnt contain job #1 in right order",
- jobs[0].getJobID().equals(fjob1.getJobID()));
- assertTrue(queueName + " queue doesnt contain job #2 in right order",
- jobs[1].getJobID().equals(fjob2.getJobID()));
-
+ assertTrue(
+ queueName + " queue doesnt contain job #1 in right order",
+ jobs[0].getJobID().equals(fjob1.getJobID()));
+ assertTrue(
+ queueName + " queue doesnt contain job #2 in right order",
+ jobs[1].getJobID().equals(fjob2.getJobID()));
+
// I. Check the start-time change
// Change job2 start-time and check if job2 bumps up in the queue
taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
-
+
jobs = getJobsInQueue(waiting);
- assertTrue("Start time change didnt not work as expected for job #2 in "
- + queueName + " queue",
- jobs[0].getJobID().equals(fjob2.getJobID()));
- assertTrue("Start time change didnt not work as expected for job #1 in"
- + queueName + " queue",
- jobs[1].getJobID().equals(fjob1.getJobID()));
-
+ assertTrue(
+ "Start time change didnt not work as expected for job #2 in "
+ + queueName + " queue",
+ jobs[0].getJobID().equals(fjob2.getJobID()));
+ assertTrue(
+ "Start time change didnt not work as expected for job #1 in"
+ + queueName + " queue",
+ jobs[1].getJobID().equals(fjob1.getJobID()));
+
// check if the queue is fine
- assertEquals("Start-time change garbled the " + queueName + " queue",
- 2, jobs.length);
-
+ assertEquals(
+ "Start-time change garbled the " + queueName + " queue",
+ 2, jobs.length);
+
// II. Change job priority change
// Bump up job1's priority and make sure job1 bumps up in the queue
taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
-
+
// Check if the priority changes are reflected
jobs = getJobsInQueue(waiting);
- assertTrue("Priority change didnt not work as expected for job #1 in "
- + queueName + " queue",
- jobs[0].getJobID().equals(fjob1.getJobID()));
- assertTrue("Priority change didnt not work as expected for job #2 in "
- + queueName + " queue",
- jobs[1].getJobID().equals(fjob2.getJobID()));
-
+ assertTrue(
+ "Priority change didnt not work as expected for job #1 in "
+ + queueName + " queue",
+ jobs[0].getJobID().equals(fjob1.getJobID()));
+ assertTrue(
+ "Priority change didnt not work as expected for job #2 in "
+ + queueName + " queue",
+ jobs[1].getJobID().equals(fjob2.getJobID()));
+
// check if the queue is fine
- assertEquals("Priority change has garbled the " + queueName + " queue",
- 2, jobs.length);
-
+ assertEquals(
+ "Priority change has garbled the " + queueName + " queue",
+ 2, jobs.length);
+
// reset the queue state back to normal
taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
}
-
+
private JobInProgress[] getJobsInQueue(boolean waiting) {
- Collection<JobInProgress> queue =
- waiting
- ? scheduler.jobQueuesManager.getWaitingJobs("default")
- : scheduler.jobQueuesManager.getRunningJobQueue("default");
+ Collection<JobInProgress> queue =
+ waiting
+ ? scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+ : scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
return queue.toArray(new JobInProgress[0]);
}
-
+
// tests if tasks can be assinged when there are multiple jobs from a same
// user
public void testJobFinished() throws Exception {
- taskTrackerManager.addQueues(new String[] {"default"});
-
+ taskTrackerManager.addQueues(new String[]{"default"});
+
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
resConf.setFakeQueues(queues);
@@ -1065,49 +366,63 @@
scheduler.start();
// submit 2 jobs
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
-
+ FakeJobInProgress j1 = submitJobAndInit(
+ JobStatus.PREP, 3, 0, "default", "u1");
+ FakeJobInProgress j2 = submitJobAndInit(
+ JobStatus.PREP, 3, 0, "default", "u1");
+
// I. Check multiple assignments with running tasks within job
// ask for a task from first job
- Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ Task t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// ask for another task from the first job
- t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
+
// complete tasks
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
-
+
// II. Check multiple assignments with running tasks across jobs
// ask for a task from first job
- t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000003_0 on tt1");
+
// ask for a task from the second job
- t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
+
// complete tasks
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
-
+
// III. Check multiple assignments with completed tasks across jobs
// ask for a task from the second job
- t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000002_0 on tt1");
+
// complete task
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
-
+
// IV. Check assignment with completed job
// finish first job
- scheduler.jobCompleted(j1);
-
+ scheduler.jobQueuesManager.getJobQueue(j1).jobCompleted(j1);
+
// ask for another task from the second job
// if tasks can be assigned then the structures are properly updated
- t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000003_0 on tt1");
+
// complete task
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
}
-
+
// basic tests, should be able to submit to queues
public void testSubmitToQueues() throws Exception {
// set up some queues
@@ -1123,63 +438,70 @@
// submit a job with no queue specified. It should be accepted
// and given to the default queue.
JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-
+
// when we ask for a task, we should get one, from the job submitted
Task t;
- t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// submit another job, to a different queue
j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// now when we get a task, it should be from the second job
- t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000001_0 on tt2");
}
-
+
public void testGetJobs() throws Exception {
// need only one queue
- String[] qs = { "default" };
+ String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
+ HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
submitJobs(1, 4, "default");
-
+
JobQueuesManager mgr = scheduler.jobQueuesManager;
-
- while(mgr.getWaitingJobs("default").size() < 4){
+
+ while (mgr.getJobQueue("default").getWaitingJobs().size() < 4) {
Thread.sleep(1);
}
//Raise status change events for jobs submitted.
raiseStatusChangeEvents(mgr);
Collection<JobInProgress> jobs = scheduler.getJobs("default");
-
- assertTrue("Number of jobs returned by scheduler is wrong"
- ,jobs.size() == 4);
-
- assertTrue("Submitted jobs and Returned jobs are not same",
- subJobsList.get("u1").containsAll(jobs));
+
+ assertTrue(
+ "Number of jobs returned by scheduler is wrong"
+ , jobs.size() == 4);
+
+ assertTrue(
+ "Submitted jobs and Returned jobs are not same",
+ subJobsList.get("u1").containsAll(jobs));
}
-
+
//Basic test to test capacity allocation across the queues which have no
//capacity configured.
-
+
public void testCapacityAllocationToQueues() throws Exception {
- String[] qs = {"default","q1","q2","q3","q4"};
+ String[] qs = {"default", "qAZ1", "qAZ2", "qAZ3", "qAZ4"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default",25.0f,true,25));
- queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
+ queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ1", -1.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ2", -1.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ3", -1.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ4", -1.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
- scheduler.start();
- assertEquals(18.75f, resConf.getCapacity("q1"));
- assertEquals(18.75f, resConf.getCapacity("q2"));
- assertEquals(18.75f, resConf.getCapacity("q3"));
- assertEquals(18.75f, resConf.getCapacity("q4"));
+ scheduler.start();
+ JobQueuesManager jqm = scheduler.jobQueuesManager;
+ assertEquals(18.75f, jqm.getJobQueue("qAZ1").qsc.getCapacityPercent());
+ assertEquals(18.75f, jqm.getJobQueue("qAZ2").qsc.getCapacityPercent());
+ assertEquals(18.75f, jqm.getJobQueue("qAZ3").qsc.getCapacityPercent());
+ assertEquals(18.75f, jqm.getJobQueue("qAZ4").qsc.getCapacityPercent());
}
// Tests how capacity is computed and assignment of tasks done
@@ -1196,47 +518,47 @@
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
-
+
// submit a job to the default queue
submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
-
+
// submit a job to the second queue
submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
-
+
// job from q2 runs first because it has some non-zero capacity.
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- verifyCapacity("0", "default");
- verifyCapacity("3", "q2");
-
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
+ verifyCapacity(taskTrackerManager, "0", "default");
+ verifyCapacity(taskTrackerManager, "3", "q2");
+
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt3");
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
- verifyCapacity("0", "default");
- verifyCapacity("5", "q2");
-
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000002_0 on tt2");
+ verifyCapacity(taskTrackerManager, "0", "default");
+ verifyCapacity(taskTrackerManager, "5", "q2");
+
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt4");
- checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
- verifyCapacity("0", "default");
- verifyCapacity("7", "q2");
-
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ "attempt_test_0002_m_000003_0 on tt3");
+ verifyCapacity(taskTrackerManager, "0", "default");
+ verifyCapacity(taskTrackerManager, "7", "q2");
+
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt5");
// now job from default should run, as it is furthest away
// in terms of runningMaps / capacity.
- checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
- verifyCapacity("1", "default");
- verifyCapacity("9", "q2");
- }
-
- private void verifyCapacity(String expectedCapacity,
- String queue) throws IOException {
- String schedInfo = taskTrackerManager.getQueueManager().
- getSchedulerInfo(queue).toString();
- assertTrue(schedInfo.contains("Map tasks\nCapacity: "
- + expectedCapacity + " slots"));
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0001_m_000001_0 on tt4");
+ verifyCapacity(taskTrackerManager, "1", "default");
+ verifyCapacity(taskTrackerManager, "9", "q2");
}
-
+
// test capacity transfer
public void testCapacityTransfer() throws Exception {
// set up some queues
@@ -1253,14 +575,22 @@
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// I should get another map task.
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
// Now we're at full capacity for maps. If I ask for another map task,
// I should get a map task from the default queue's capacity.
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000003_0 on tt2");
// and another
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000004_0 on tt2");
}
/**
@@ -1271,29 +601,29 @@
* @throws IOException
*/
public void testHighMemoryBlockingWithMaxLimit()
- throws IOException {
+ throws IOException {
// 2 map and 1 reduce slots
taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
- taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+ taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
+ queues.add(new FakeQueueInfo("defaultXYZM", 100.0f, true, 25));
resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("defaultXYZ",2);
+ resConf.setMaxMapCap("defaultXYZM", 2);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
- 2 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
- 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1301,40 +631,46 @@
//Set the max limit for queue to 2 ,
// try submitting more map tasks to the queue , it should not happen
- LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+ LOG.debug(
+ "Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
jConf.setMemoryForReduceTask(0);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(0);
- jConf.setQueueName("defaultXYZ");
+ jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+ LOG.debug(
+ "Submit another regular memory(1GB vmem maps/reduces) job of "
+ "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
- jConf.setQueueName("defaultXYZ");
+ jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
// first, a map from j1 will run this is a high memory job so it would
// occupy the 2 slots
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
// at this point, the scheduler tries to schedule another map from j1.
// there isn't enough space. The second job's reduce should be scheduled.
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
+
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
//at this point , the scheduler tries to schedule another map from j2 for
@@ -1342,11 +678,13 @@
// This should not happen as all the map slots are taken
//by the first task itself.hence reduce task from the second job is given
- checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_r_000002_0 on tt2");
}
/**
- * test if user limits automatically adjust to max map or reduce limit
+ * test if user limits automatically adjust to max map or reduce limit
*/
public void testUserLimitsWithMaxLimits() throws Exception {
setUp(4, 4, 4);
@@ -1370,18 +708,27 @@
// for queue 'default', the capacity for maps is 2.
// But the max map limit is 2
// hence user should be getting not more than 1 as it is the 50%.
- Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ Task t1 = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
//Now we should get the task from the other job. As the
//first user has reached his max map limit.
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000001_0 on tt2");
//Now we are done with map limit , now if we ask for task we should
// get reduce from 1st job
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ "attempt_test_0001_r_000001_0 on tt3");
// Now we're at full capacity for maps. 1 done with reduces for job 1 so
// now we should get 1 reduces for job 2
- Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
+ Task t4 = checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0002_r_000001_0 on tt4");
taskTrackerManager.finishTask(
"tt1", t1.getTaskID().toString(),
@@ -1389,14 +736,18 @@
//tt1 completed the task so we have 1 map slot for u1
// we are assigning the 2nd map task from fjob1
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
taskTrackerManager.finishTask(
"tt4", t4.getTaskID().toString(),
fjob2);
//tt4 completed the task , so we have 1 reduce slot for u2
//we are assigning the 2nd reduce from fjob2
- checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0002_r_000002_0 on tt4");
}
@@ -1417,16 +768,24 @@
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// Submit another job, from a different user
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a map task, it should come from the second job
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
// Now we're at full capacity for maps. If I ask for another map task,
// I should get a map task from the default queue's capacity.
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000002_0 on tt2");
// and another
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
@@ -1445,15 +804,23 @@
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// since we're the only job, we get another map
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
// Submit another job, from a different user
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a map task, it should come from the second job
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000001_0 on tt2");
// and another
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
@@ -1473,27 +840,43 @@
FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// since we're the only job, we get another map
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
// we get two more maps from 'default queue'
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000004_0 on tt2");
// Submit another job, from a different user
FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// one of the task finishes
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
// Now if I ask for a map task, it should come from the second job
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
// another task from job1 finishes, another new task to job2
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
- checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000002_0 on tt1");
// now we have equal number of tasks from each job. Whichever job's
// task finishes, that job gets a new task
taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
- checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000005_0 on tt2");
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
- checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000003_0 on tt1");
}
// test user limits with many users, more slots
@@ -1514,20 +897,40 @@
// u1 submits job
FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// it gets the first 5 slots
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
- checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ "attempt_test_0001_m_000005_0 on tt3");
// u2 submits job with 4 slots
FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
- checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
- checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
- checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
- checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ "attempt_test_0002_m_000001_0 on tt3");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0002_m_000002_0 on tt4");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0002_m_000003_0 on tt4");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ "attempt_test_0002_m_000004_0 on tt5");
// last slot should go to u1, since u2 has no more tasks
- checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ "attempt_test_0001_m_000006_0 on tt5");
// u1 finishes a task
taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
// u1 submits a few more jobs
@@ -1543,28 +946,34 @@
submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
// next slot should go to u3, even though u2 has an earlier job, since
// user limits have changed and u1/u2 are over limits
- checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ "attempt_test_0007_m_000001_0 on tt5");
// some other task finishes and u3 gets it
taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
- checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ "attempt_test_0007_m_000002_0 on tt5");
// now, u2 finishes a task
taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
// next slot will go to u1, since u3 has nothing to run and u1's job is
// first in the queue
- checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0001_m_000007_0 on tt4");
}
/**
* Test to verify that high memory jobs hit user limits faster than any normal
* job.
- *
+ *
* @throws IOException
*/
public void testUserLimitsForHighMemoryJobs()
- throws IOException {
+ throws IOException {
taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
scheduler.setTaskTrackerManager(taskTrackerManager);
- String[] qs = { "default" };
+ String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
@@ -1572,13 +981,13 @@
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1592,7 +1001,8 @@
jConf.setQueueName("default");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+ LOG.debug(
+ "Submit one high memory(2GB maps, 2GB reduces) job of "
+ "6 map and 6 reduce tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
@@ -1604,23 +1014,51 @@
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
// Verify that normal job takes 3 task assignments to hit user limits
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_r_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000003_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_r_000003_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000004_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_r_000004_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000005_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_r_000005_0 on tt1");
// u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
// are hit. So u2 should get slots
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_r_000002_0 on tt1");
// u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
// slots. Because of high memory tasks, giving u2 another task would
@@ -1679,12 +1117,12 @@
scheduler.assignTasks(tracker("tt2")); // heartbeat
int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
int totalReduces =
- taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+ taskTrackerManager.getClusterStatus().getMaxReduceTasks();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
String schedulingInfo =
- queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
String schedulingInfo2 =
- queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+ queueManager.getJobQueueInfo("q2").getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
assertEquals(infoStrings[0], "Queue configuration");
@@ -1693,13 +1131,15 @@
assertEquals(infoStrings[3], "Priority Supported: YES");
assertEquals(infoStrings[4], "-------------");
assertEquals(infoStrings[5], "Map tasks");
- assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+ assertEquals(
+ infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+ " slots");
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
assertEquals(infoStrings[9], "-------------");
assertEquals(infoStrings[10], "Reduce tasks");
- assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+ assertEquals(
+ infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+ " slots");
assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[13], "Running tasks: 0");
@@ -1744,14 +1184,16 @@
raiseStatusChangeEvents(scheduler.jobQueuesManager);
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
//assign one job
- Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ Task t1 = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
//Initalize extra job.
controlledInitializationPoller.selectJobsToInitialize();
//Get scheduling information, now the number of waiting job should have
//changed to 4 as one is scheduled and has become running.
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1765,9 +1207,11 @@
assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
//assign a reduce task
- Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ Task t2 = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1789,7 +1233,7 @@
taskTrackerManager.finalizeJob(u1j1);
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1802,13 +1246,14 @@
//Fail a job which is initialized but not scheduled and check the count.
FakeJobInProgress u1j2 = userJobs.get(1);
- assertTrue("User1 job 2 not initalized ",
- u1j2.getStatus().getRunState() == JobStatus.RUNNING);
+ assertTrue(
+ "User1 job 2 not initalized ",
+ u1j2.getStatus().getRunState() == JobStatus.RUNNING);
taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
//Run initializer to clean up failed jobs
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1823,14 +1268,15 @@
//Fail a job which is not initialized but is in the waiting queue.
FakeJobInProgress u1j5 = userJobs.get(4);
- assertFalse("User1 job 5 initalized ",
- u1j5.getStatus().getRunState() == JobStatus.RUNNING);
+ assertFalse(
+ "User1 job 5 initalized ",
+ u1j5.getStatus().getRunState() == JobStatus.RUNNING);
taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
//run initializer to clean up failed job
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1851,16 +1297,19 @@
//Now schedule a map should be job3 of the user as job1 succeeded job2
//failed and now job3 is running
- t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+ t1 = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0003_m_000001_0 on tt1");
FakeJobInProgress u1j3 = userJobs.get(2);
- assertTrue("User Job 3 not running ",
- u1j3.getStatus().getRunState() == JobStatus.RUNNING);
+ assertTrue(
+ "User Job 3 not running ",
+ u1j3.getStatus().getRunState() == JobStatus.RUNNING);
//now the running count of map should be one and waiting jobs should be
//one. run the poller as it is responsible for waiting count
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1874,7 +1323,7 @@
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateContextInfoForTests();
//Now running counts should become zero
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
@@ -1888,15 +1337,16 @@
/**
* Test to verify that highMemoryJobs are scheduled like all other jobs when
* memory-based scheduling is not enabled.
+ *
* @throws IOException
*/
public void testDisabledMemoryBasedScheduling()
- throws IOException {
+ throws IOException {
LOG.debug("Starting the scheduler.");
taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
- taskTrackerManager.addQueues(new String[] { "default" });
+ taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
@@ -1905,7 +1355,8 @@
// memory-based scheduling disabled by default.
scheduler.start();
- LOG.debug("Submit one high memory job of 1 3GB map task "
+ LOG.debug(
+ "Submit one high memory job of 1 3GB map task "
+ "and 1 1GB reduce task.");
JobConf jConf = new JobConf();
jConf.setMemoryForMapTask(3 * 1024L); // 3GB
@@ -1919,23 +1370,27 @@
// assert that all tasks are launched even though they transgress the
// scheduling limits.
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
}
/**
* Test reverting HADOOP-4979. If there is a high-mem job, we should now look
* at reduce jobs (if map tasks are high-mem) or vice-versa.
- *
+ *
* @throws IOException
*/
public void testHighMemoryBlockingAcrossTaskTypes()
- throws IOException {
+ throws IOException {
// 2 map and 1 reduce slots
taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
- taskTrackerManager.addQueues(new String[] { "default" });
+ taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
@@ -1943,15 +1398,15 @@
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
- 2 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
- 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1960,8 +1415,9 @@
// maps and reduces.
// First job cannot run for want of memory for maps. In this case, second
// job's reduces should run.
-
- LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+
+ LOG.debug(
+ "Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
@@ -1972,7 +1428,8 @@
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+ LOG.debug(
+ "Submit another regular memory(1GB vmem maps/reduces) job of "
+ "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
@@ -1982,53 +1439,60 @@
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
-
+
// first, a map from j1 will run
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// Total 2 map slots should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
// at this point, the scheduler tries to schedule another map from j1.
// there isn't enough space. The second job's reduce should be scheduled.
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
// Total 1 reduce slot should be accounted for.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
- 100.0f);
+ checkOccupiedSlots(
+ "default", TaskType.REDUCE, 1, 1,
+ 100.0f);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
}
/**
* Test blocking of cluster for lack of memory.
+ *
* @throws IOException
*/
public void testClusterBlockingForLackOfMemory()
- throws IOException {
+ throws IOException {
LOG.debug("Starting the scheduler.");
taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- taskTrackerManager.addQueues(new String[] { "default" });
+ taskTrackerManager.addQueues(new String[]{"default"});
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
// Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
- 2 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
- 2 * 1024);
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ LOG.debug(
+ "Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 1 reduce tasks.");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
@@ -2040,25 +1504,33 @@
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
// Fill the second tt with this job.
- checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000001_0 on tt2");
// Total 1 map slot should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
- assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 1, 0, 0, 0, 0),
- (String) job1.getSchedulingInfo());
+ (String) job1.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
- checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_r_000001_0 on tt2");
// Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
- 25.0f);
- assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ checkOccupiedSlots(
+ "default", TaskType.REDUCE, 1, 1,
+ 25.0f);
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 1, 0, 1, 1, 0),
- (String) job1.getSchedulingInfo());
+ (String) job1.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
- LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+ LOG.debug(
+ "Submit one high memory(2GB maps/reduces) job of "
+ "2 map, 2 reduce tasks.");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
@@ -2069,26 +1541,34 @@
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
// Total 3 map slots should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
- assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 2, 0, 0, 0, 0),
- (String) job2.getSchedulingInfo());
+ (String) job2.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
// Total 3 reduce slots should be accounted for.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
- 75.0f);
- assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ checkOccupiedSlots(
+ "default", TaskType.REDUCE, 1, 3,
+ 75.0f);
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 2, 0, 1, 2, 0),
- (String) job2.getSchedulingInfo());
+ (String) job2.getSchedulingInfo());
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
- LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ LOG.debug(
+ "Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 0 reduce tasks.");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
@@ -2111,24 +1591,28 @@
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
LOG.info(job2.getSchedulingInfo());
- assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
1, 2, 2, 1, 2, 0),
- (String) job2.getSchedulingInfo());
- assertEquals(String.format(
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ (String) job2.getSchedulingInfo());
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
0, 0, 0, 0, 0, 0),
- (String) job3.getSchedulingInfo());
+ (String) job3.getSchedulingInfo());
// One reservation is already done for job2. So job3 should go ahead.
- checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0003_m_000001_0 on tt2");
}
/**
* Testcase to verify fix for a NPE (HADOOP-5641), when memory based
* scheduling is enabled and jobs are retired from memory when tasks
* are still active on some Tasktrackers.
- *
+ *
* @throws IOException
*/
public void testMemoryMatchingWithRetiredJobs() throws IOException {
@@ -2139,24 +1623,24 @@
// create scheduler
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
- taskTrackerManager.addQueues(new String[] { "default" });
+ taskTrackerManager.addQueues(new String[]{"default"});
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
- 2 * 1024L);
[... 1290 lines stripped ...]