You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [6/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Sat Nov 28 20:26:01 2009
@@ -18,759 +18,38 @@
package org.apache.hadoop.mapred;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
import junit.framework.TestCase;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobInProgress;
-import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
public class TestCapacityScheduler extends TestCase {
static final Log LOG =
- LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
-
- private static int jobCounter;
+ LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
- /**
- * Test class that removes the asynchronous nature of job initialization.
- *
- * The run method is a dummy which just waits for completion. It is
- * expected that test code calls the main method, initializeJobs, directly
- * to trigger initialization.
- */
- class ControlledJobInitializer extends
- JobInitializationPoller.JobInitializationThread {
-
- boolean stopRunning;
-
- public ControlledJobInitializer(JobInitializationPoller p) {
- p.super();
- }
-
- @Override
- public void run() {
- while (!stopRunning) {
- try {
- synchronized(this) {
- this.wait();
- }
- } catch (InterruptedException ie) {
- break;
- }
- }
- }
-
- void stopRunning() {
- stopRunning = true;
- }
- }
-
- /**
- * Test class that removes the asynchronous nature of job initialization.
- *
- * The run method is a dummy which just waits for completion. It is
- * expected that test code calls the main method, selectJobsToInitialize,
- * directly to trigger initialization.
- *
- * The class also creates the test worker thread objects of type
- * ControlledJobInitializer instead of the objects of the actual class
- */
- class ControlledInitializationPoller extends JobInitializationPoller {
-
- private boolean stopRunning;
- private ArrayList<ControlledJobInitializer> workers;
-
- public ControlledInitializationPoller(JobQueuesManager mgr,
- CapacitySchedulerConf rmConf,
- Set<String> queues,
- TaskTrackerManager ttm) {
- super(mgr, rmConf, queues, ttm);
- }
-
- @Override
- public void run() {
- // don't do anything here.
- while (!stopRunning) {
- try {
- synchronized (this) {
- this.wait();
- }
- } catch (InterruptedException ie) {
- break;
- }
- }
- }
-
- @Override
- JobInitializationThread createJobInitializationThread() {
- ControlledJobInitializer t = new ControlledJobInitializer(this);
- if (workers == null) {
- workers = new ArrayList<ControlledJobInitializer>();
- }
- workers.add(t);
- return t;
- }
+ String queueConfigPath =
+ System.getProperty("test.build.extraconf", "build/test/extraconf");
+ File queueConfigFile =
+ new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
- @Override
- void selectJobsToInitialize() {
- super.cleanUpInitializedJobsList();
- super.selectJobsToInitialize();
- for (ControlledJobInitializer t : workers) {
- t.initializeJobs();
- }
- }
-
- void stopRunning() {
- stopRunning = true;
- for (ControlledJobInitializer t : workers) {
- t.stopRunning();
- t.interrupt();
- }
- }
- }
+ private static int jobCounter;
private ControlledInitializationPoller controlledInitializationPoller;
- /*
- * Fake job in progress object used for testing the schedulers scheduling
- * decisions. The JobInProgress objects returns out FakeTaskInProgress
- * objects when assignTasks is called. If speculative maps and reduces
- * are configured then JobInProgress returns exactly one Speculative
- * map and reduce task.
- */
- static class FakeJobInProgress extends JobInProgress {
-
- protected FakeTaskTrackerManager taskTrackerManager;
- private int mapTaskCtr;
- private int redTaskCtr;
- private Set<TaskInProgress> mapTips =
- new HashSet<TaskInProgress>();
- private Set<TaskInProgress> reduceTips =
- new HashSet<TaskInProgress>();
- private int speculativeMapTaskCounter = 0;
- private int speculativeReduceTaskCounter = 0;
- public FakeJobInProgress(JobID jId, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(jId, jobConf, null);
- this.taskTrackerManager = taskTrackerManager;
- this.startTime = System.currentTimeMillis();
- this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP,
- jobConf.getUser(),
- jobConf.getJobName(), "", "");
- this.status.setJobPriority(JobPriority.NORMAL);
- this.status.setStartTime(startTime);
- if (null == jobConf.getQueueName()) {
- this.profile = new JobProfile(user, jId,
- null, null, null);
- }
- else {
- this.profile = new JobProfile(user, jId,
- null, null, null, jobConf.getQueueName());
- }
- mapTaskCtr = 0;
- redTaskCtr = 0;
- }
-
- @Override
- public synchronized void initTasks() throws IOException {
- getStatus().setRunState(JobStatus.RUNNING);
- }
-
- @Override
- public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
- int ignored) throws IOException {
- boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
- if (areAllMapsRunning){
- if(!getJobConf().getMapSpeculativeExecution() ||
- speculativeMapTasks > 0) {
- return null;
- }
- }
- TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(),
- super.numSlotsPerMap) {
- @Override
- public String toString() {
- return String.format("%s on %s", getTaskID(), tts.getTrackerName());
- }
- };
- taskTrackerManager.startTask(tts.getTrackerName(), task);
- runningMapTasks++;
- // create a fake TIP and keep track of it
- FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(),
- getJobConf(), task, true, this);
- mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
- if(areAllMapsRunning) {
- speculativeMapTasks++;
- //you have scheduled a speculative map. Now set all tips in the
- //map tips not to have speculative task.
- for(TaskInProgress t : mapTips) {
- if (t instanceof FakeTaskInProgress) {
- FakeTaskInProgress mt = (FakeTaskInProgress) t;
- mt.hasSpeculativeMap = false;
- }
- }
- } else {
- //add only non-speculative tips.
- mapTips.add(mapTip);
- //add the tips to the JobInProgress TIPS
- maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
- }
- return task;
- }
-
- @Override
- public Task obtainNewReduceTask(final TaskTrackerStatus tts,
- int clusterSize, int ignored) throws IOException {
- boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
- if (areAllReducesRunning){
- if(!getJobConf().getReduceSpeculativeExecution() ||
- speculativeReduceTasks > 0) {
- return null;
- }
- }
- TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
- Task task = new ReduceTask("", attemptId, 0, 10,
- super.numSlotsPerReduce) {
- @Override
- public String toString() {
- return String.format("%s on %s", getTaskID(), tts.getTrackerName());
- }
- };
- taskTrackerManager.startTask(tts.getTrackerName(), task);
- runningReduceTasks++;
- // create a fake TIP and keep track of it
- FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(),
- getJobConf(), task, false, this);
- reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
- if(areAllReducesRunning) {
- speculativeReduceTasks++;
- //you have scheduled a speculative map. Now set all tips in the
- //map tips not to have speculative task.
- for(TaskInProgress t : reduceTips) {
- if (t instanceof FakeTaskInProgress) {
- FakeTaskInProgress rt = (FakeTaskInProgress) t;
- rt.hasSpeculativeReduce = false;
- }
- }
- } else {
- //add only non-speculative tips.
- reduceTips.add(reduceTip);
- //add the tips to the JobInProgress TIPS
- reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
- }
- return task;
- }
-
- public void mapTaskFinished() {
- runningMapTasks--;
- finishedMapTasks++;
- }
-
- public void reduceTaskFinished() {
- runningReduceTasks--;
- finishedReduceTasks++;
- }
-
- private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
- JobID jobId = getJobID();
- TaskType t = TaskType.REDUCE;
- if (isMap) {
- t = TaskType.MAP;
- }
- if (!isSpeculative) {
- return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
- (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
- } else {
- return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
- (isMap) ? mapTaskCtr : redTaskCtr, 1);
- }
- }
-
- @Override
- Set<TaskInProgress> getNonLocalRunningMaps() {
- return (Set<TaskInProgress>)mapTips;
- }
- @Override
- Set<TaskInProgress> getRunningReduces() {
- return (Set<TaskInProgress>)reduceTips;
- }
-
- }
-
- static class FakeFailingJobInProgress extends FakeJobInProgress {
-
- public FakeFailingJobInProgress(JobID id, JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager, String user) {
- super(id, jobConf, taskTrackerManager, user);
- }
-
- @Override
- public synchronized void initTasks() throws IOException {
- throw new IOException("Failed Initalization");
- }
-
- @Override
- synchronized void fail() {
- this.status.setRunState(JobStatus.FAILED);
- }
- }
-
- static class FakeTaskInProgress extends TaskInProgress {
- private boolean isMap;
- private FakeJobInProgress fakeJob;
- private TreeMap<TaskAttemptID, String> activeTasks;
- private TaskStatus taskStatus;
- boolean hasSpeculativeMap;
- boolean hasSpeculativeReduce;
-
- FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
- boolean isMap, FakeJobInProgress job) {
- super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
- this.isMap = isMap;
- this.fakeJob = job;
- activeTasks = new TreeMap<TaskAttemptID, String>();
- activeTasks.put(t.getTaskID(), "tt");
- // create a fake status for a task that is running for a bit
- this.taskStatus = TaskStatus.createTaskStatus(isMap);
- taskStatus.setProgress(0.5f);
- taskStatus.setRunState(TaskStatus.State.RUNNING);
- if (jobConf.getMapSpeculativeExecution()) {
- //resetting of the hasSpeculativeMap is done
- //when speculative map is scheduled by the job.
- hasSpeculativeMap = true;
- }
- if (jobConf.getReduceSpeculativeExecution()) {
- //resetting of the hasSpeculativeReduce is done
- //when speculative reduce is scheduled by the job.
- hasSpeculativeReduce = true;
- }
- }
-
- @Override
- TreeMap<TaskAttemptID, String> getActiveTasks() {
- return activeTasks;
- }
- @Override
- public TaskStatus getTaskStatus(TaskAttemptID taskid) {
- // return a status for a task that has run a bit
- return taskStatus;
- }
- @Override
- boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
- if (isMap) {
- fakeJob.mapTaskFinished();
- }
- else {
- fakeJob.reduceTaskFinished();
- }
- return true;
- }
-
- @Override
- /*
- *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
- *after the speculative tip has been scheduled.
- */
- boolean canBeSpeculated(long currentTime) {
- if(isMap && hasSpeculativeMap) {
- return fakeJob.getJobConf().getMapSpeculativeExecution();
- }
- if (!isMap && hasSpeculativeReduce) {
- return fakeJob.getJobConf().getReduceSpeculativeExecution();
- }
- return false;
- }
-
- @Override
- public boolean isRunning() {
- return !activeTasks.isEmpty();
- }
-
- }
-
- static class FakeQueueManager extends QueueManager {
- private Set<String> queueNames = null;
- private static final AccessControlList allEnabledAcl = new AccessControlList("*");
-
- FakeQueueManager() {
- super(new Configuration());
- }
-
- void setQueues(Set<String> queueNames) {
- this.queueNames = queueNames;
-
- // sync up queues with the parent class.
- Queue[] queues = new Queue[queueNames.size()];
- int i = 0;
- for (String queueName : queueNames) {
- HashMap<String, AccessControlList> aclsMap
- = new HashMap<String, AccessControlList>();
- for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
- String key = QueueManager.toFullPropertyName(queueName,
- oper.getAclName());
- aclsMap.put(key, allEnabledAcl);
- }
- queues[i++] = new Queue(queueName, aclsMap, Queue.QueueState.RUNNING);
- }
- super.setQueues(queues);
- }
-
- public synchronized Set<String> getQueues() {
- return queueNames;
- }
- }
-
- static class FakeTaskTrackerManager implements TaskTrackerManager {
- int maps = 0;
- int reduces = 0;
- int maxMapTasksPerTracker = 2;
- int maxReduceTasksPerTracker = 1;
- long ttExpiryInterval = 10 * 60 * 1000L; // default interval
- List<JobInProgressListener> listeners =
- new ArrayList<JobInProgressListener>();
- FakeQueueManager qm = new FakeQueueManager();
-
- private Map<String, TaskTracker> trackers =
- new HashMap<String, TaskTracker>();
- private Map<String, TaskStatus> taskStatuses =
- new HashMap<String, TaskStatus>();
- private Map<JobID, JobInProgress> jobs =
- new HashMap<JobID, JobInProgress>();
-
- public FakeTaskTrackerManager() {
- this(2, 2, 1);
- }
-
- public FakeTaskTrackerManager(int numTaskTrackers,
- int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
- this.maxMapTasksPerTracker = maxMapTasksPerTracker;
- this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
- for (int i = 1; i < numTaskTrackers + 1; i++) {
- String ttName = "tt" + i;
- TaskTracker tt = new TaskTracker(ttName);
- tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker,
- maxReduceTasksPerTracker));
- trackers.put(ttName, tt);
- }
- }
-
- public void addTaskTracker(String ttName) {
- TaskTracker tt = new TaskTracker(ttName);
- tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker,
- maxReduceTasksPerTracker));
- trackers.put(ttName, tt);
- }
-
- public ClusterStatus getClusterStatus() {
- int numTrackers = trackers.size();
- return new ClusterStatus(numTrackers, 0,
- ttExpiryInterval, maps, reduces,
- numTrackers * maxMapTasksPerTracker,
- numTrackers * maxReduceTasksPerTracker,
- JobTracker.State.RUNNING);
- }
-
- public int getNumberOfUniqueHosts() {
- return 0;
- }
-
- public int getNextHeartbeatInterval() {
- return MRConstants.HEARTBEAT_INTERVAL_MIN;
- }
-
- @Override
- public void killJob(JobID jobid) throws IOException {
- JobInProgress job = jobs.get(jobid);
- finalizeJob(job, JobStatus.KILLED);
- job.kill();
- }
-
- public void initJob(JobInProgress jip) {
- try {
- JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
- jip.initTasks();
- if (jip.isJobEmpty()) {
- completeEmptyJob(jip);
- } else if (!jip.isSetupCleanupRequired()) {
- jip.completeSetup();
- }
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- JobStatusChangeEvent event = new JobStatusChangeEvent(jip,
- EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- } catch (Exception ioe) {
- failJob(jip);
- }
- }
-
- private synchronized void completeEmptyJob(JobInProgress jip) {
- jip.completeEmptyJob();
- }
-
- public synchronized void failJob(JobInProgress jip) {
- JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
- jip.fail();
- JobStatus newStatus = (JobStatus)jip.getStatus().clone();
- JobStatusChangeEvent event = new JobStatusChangeEvent(jip,
- EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- public void removeJob(JobID jobid) {
- jobs.remove(jobid);
- }
-
- @Override
- public JobInProgress getJob(JobID jobid) {
- return jobs.get(jobid);
- }
-
- Collection<JobInProgress> getJobs() {
- return jobs.values();
- }
-
- public Collection<TaskTrackerStatus> taskTrackers() {
- List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
- for (TaskTracker tt : trackers.values()) {
- statuses.add(tt.getStatus());
- }
- return statuses;
- }
-
-
- public void addJobInProgressListener(JobInProgressListener listener) {
- listeners.add(listener);
- }
-
- public void removeJobInProgressListener(JobInProgressListener listener) {
- listeners.remove(listener);
- }
-
- public void submitJob(JobInProgress job) throws IOException {
- jobs.put(job.getJobID(), job);
- for (JobInProgressListener listener : listeners) {
- listener.jobAdded(job);
- }
- }
-
- public TaskTracker getTaskTracker(String trackerID) {
- return trackers.get(trackerID);
- }
-
- public void startTask(String taskTrackerName, final Task t) {
- if (t.isMapTask()) {
- maps++;
- } else {
- reduces++;
- }
- TaskStatus status = new TaskStatus() {
- @Override
- public TaskAttemptID getTaskID() {
- return t.getTaskID();
- }
-
- @Override
- public boolean getIsMap() {
- return t.isMapTask();
- }
-
- @Override
- public int getNumSlots() {
- return t.getNumSlotsRequired();
- }
- };
- taskStatuses.put(t.getTaskID().toString(), status);
- status.setRunState(TaskStatus.State.RUNNING);
- trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
- }
-
- public void finishTask(String taskTrackerName, String tipId,
- FakeJobInProgress j) {
- TaskStatus status = taskStatuses.get(tipId);
- if (status.getIsMap()) {
- maps--;
- j.mapTaskFinished();
- } else {
- reduces--;
- j.reduceTaskFinished();
- }
- status.setRunState(TaskStatus.State.SUCCEEDED);
- }
-
- void finalizeJob(FakeJobInProgress fjob) {
- finalizeJob(fjob, JobStatus.SUCCEEDED);
- }
-
- void finalizeJob(JobInProgress fjob, int state) {
- // take a snapshot of the status before changing it
- JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
- fjob.getStatus().setRunState(state);
- JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
- JobStatusChangeEvent event =
- new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus,
- newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
- // take a snapshot of the status before changing it
- JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
- fjob.setPriority(priority);
- JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
- JobStatusChangeEvent event =
- new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus,
- newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- public void setStartTime(FakeJobInProgress fjob, long start) {
- // take a snapshot of the status before changing it
- JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-
- fjob.startTime = start; // change the start time of the job
- fjob.status.setStartTime(start); // change the start time of the jobstatus
-
- JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-
- JobStatusChangeEvent event =
- new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
- newStatus);
- for (JobInProgressListener listener : listeners) {
- listener.jobUpdated(event);
- }
- }
-
- void addQueues(String[] arr) {
- Set<String> queues = new HashSet<String>();
- for (String s: arr) {
- queues.add(s);
- }
- qm.setQueues(queues);
- }
-
- public QueueManager getQueueManager() {
- return qm;
- }
-
- @Override
- public boolean killTask(TaskAttemptID taskid, boolean shouldFail) {
- return true;
- }
- }
-
- // represents a fake queue configuration info
- static class FakeQueueInfo {
- String queueName;
- float capacity;
- boolean supportsPrio;
- int ulMin;
-
- public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
- this.queueName = queueName;
- this.capacity = capacity;
- this.supportsPrio = supportsPrio;
- this.ulMin = ulMin;
- }
- }
-
- static class FakeResourceManagerConf extends CapacitySchedulerConf {
-
- // map of queue names to queue info
- private Map<String, FakeQueueInfo> queueMap =
- new LinkedHashMap<String, FakeQueueInfo>();
- String firstQueue;
-
-
- void setFakeQueues(List<FakeQueueInfo> queues) {
- for (FakeQueueInfo q: queues) {
- queueMap.put(q.queueName, q);
- }
- firstQueue = new String(queues.get(0).queueName);
- }
-
- public synchronized Set<String> getQueues() {
- return queueMap.keySet();
- }
-
- /*public synchronized String getFirstQueue() {
- return firstQueue;
- }*/
-
- public float getCapacity(String queue) {
- if(queueMap.get(queue).capacity == -1) {
- return super.getCapacity(queue);
- }
- return queueMap.get(queue).capacity;
- }
-
- public int getMinimumUserLimitPercent(String queue) {
- return queueMap.get(queue).ulMin;
- }
-
- public boolean isPrioritySupported(String queue) {
- return queueMap.get(queue).supportsPrio;
- }
-
- @Override
- public long getSleepInterval() {
- return 1;
- }
-
- @Override
- public int getMaxWorkerThreads() {
- return 1;
- }
- }
-
- protected class FakeClock extends CapacityTaskScheduler.Clock {
- private long time = 0;
-
- public void advance(long millis) {
- time += millis;
- }
- @Override
- long getTime() {
- return time;
- }
- }
-
protected JobConf conf;
protected CapacityTaskScheduler scheduler;
private FakeTaskTrackerManager taskTrackerManager;
- private FakeResourceManagerConf resConf;
private FakeClock clock;
@Override
@@ -778,30 +57,31 @@
setUp(2, 2, 1);
}
- private void setUp(int numTaskTrackers, int numMapTasksPerTracker,
- int numReduceTasksPerTracker) {
+ private void setUp(
+ int numTaskTrackers, int numMapTasksPerTracker,
+ int numReduceTasksPerTracker) {
jobCounter = 0;
taskTrackerManager =
- new FakeTaskTrackerManager(numTaskTrackers, numMapTasksPerTracker,
- numReduceTasksPerTracker);
+ new FakeTaskTrackerManager(
+ numTaskTrackers, numMapTasksPerTracker,
+ numReduceTasksPerTracker);
clock = new FakeClock();
scheduler = new CapacityTaskScheduler(clock);
scheduler.setTaskTrackerManager(taskTrackerManager);
conf = new JobConf();
// Don't let the JobInitializationPoller come in our way.
- resConf = new FakeResourceManagerConf();
- controlledInitializationPoller = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues(), taskTrackerManager);
+ conf.set("mapred.queue.names","default");
+ controlledInitializationPoller =
+ new ControlledInitializationPoller(scheduler.jobQueuesManager,
+ taskTrackerManager);
scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(conf);
//by default disable speculative execution.
conf.setMapSpeculativeExecution(false);
conf.setReduceSpeculativeExecution(false);
}
-
+
@Override
protected void tearDown() throws Exception {
if (scheduler != null) {
@@ -809,377 +89,366 @@
}
}
- private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
- FakeJobInProgress job =
- new FakeJobInProgress(new JobID("test", ++jobCounter),
- (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
- jobConf.getUser());
- job.getStatus().setRunState(state);
- taskTrackerManager.submitJob(job);
- return job;
- }
-
- private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
- throws IOException {
- FakeJobInProgress j = submitJob(state, jobConf);
- taskTrackerManager.initJob(j);
- return j;
- }
-
- private FakeJobInProgress submitJob(int state, int maps, int reduces,
- String queue, String user) throws IOException {
- JobConf jobConf = new JobConf(conf);
- jobConf.setNumMapTasks(maps);
- jobConf.setNumReduceTasks(reduces);
- if (queue != null)
- jobConf.setQueueName(queue);
- jobConf.setUser(user);
- return submitJob(state, jobConf);
- }
-
- // Submit a job and update the listeners
- private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
- String queue, String user)
- throws IOException {
- FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
- taskTrackerManager.initJob(j);
- return j;
- }
-
- /**
- * Test the max map limit.
- * @throws IOException
- */
- public void testMaxMapCap() throws IOException {
- this.setUp(4,1,1);
- taskTrackerManager.addQueues(new String[] {"default"});
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
- resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("default",2);
- resConf.setMaxReduceCap("default",-1);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- //submit the Job
- FakeJobInProgress fjob1 =
- submitJob(JobStatus.PREP,3,1,"default","user");
-
- taskTrackerManager.initJob(fjob1);
-
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-
- //Once the 2 tasks are running the third assigment should be reduce.
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
- //This should fail.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
- //Now complete the task 1.
- // complete the job
- taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
- fjob1);
- //We have completed the tt1 task which was a map task so we expect one map
- //task to be picked up
- checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
- }
-
/**
- * Test max reduce limit
+ * Test max capacity
* @throws IOException
*/
- public void testMaxReduceCap() throws IOException {
+ public void testMaxCapacity() throws IOException {
this.setUp(4, 1, 1);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
- resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("default", -1);
- resConf.setMaxReduceCap("default", 2);
- scheduler.setResourceManagerConf(resConf);
+ queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .setMaxCapacityPercent(50.0f);
//submit the Job
- FakeJobInProgress fjob1 =
- submitJob(JobStatus.PREP, 1, 3, "default", "user");
+ FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
+ JobStatus.PREP, 4, 4, "default", "user");
taskTrackerManager.initJob(fjob1);
+ HashMap<String, String> expectedStrings = new HashMap<String, String>();
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
- List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> task1 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1", expectedStrings);
- //This should fail. 1 map, 2 reduces , we have reached the limit.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
- //Now complete the task 1 i.e map task.
- // complete the job
- taskTrackerManager.finishTask(
- "tt1", task1.get(0).getTaskID().toString(),
- fjob1);
- //This should still fail as only map task is done
- task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+ List<Task> task2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2", expectedStrings);
- //Complete the reduce task
- taskTrackerManager.finishTask(
- "tt2", task2.get(0).getTaskID().toString(), fjob1);
+ //we have already reached the limit
+ //this call would return null
+ List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+ assertNull(task3);
- //One reduce is done hence assign the new reduce.
- checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+ //Now complete the task 1 i.e map task.
+ for (Task task : task1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), fjob1);
+ }
+
+ expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt1");
+ task2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1", expectedStrings);
}
-
+
// test job run-state change
public void testJobRunStateChange() throws IOException {
// start the scheduler
- taskTrackerManager.addQueues(new String[] {"default"});
+ taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
-
+
// submit the job
- FakeJobInProgress fjob1 =
- submitJob(JobStatus.PREP, 1, 0, "default", "user");
-
- FakeJobInProgress fjob2 =
- submitJob(JobStatus.PREP, 1, 0, "default", "user");
-
+ FakeJobInProgress fjob1 =
+ taskTrackerManager.submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
+ FakeJobInProgress fjob2 =
+ taskTrackerManager.submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
// test if changing the job priority/start-time works as expected in the
// waiting queue
testJobOrderChange(fjob1, fjob2, true);
-
+
// Init the jobs
// simulate the case where the job with a lower priority becomes running
// first (may be because of the setup tasks).
-
+
// init the lower ranked job first
taskTrackerManager.initJob(fjob2);
-
+
// init the higher ordered job later
taskTrackerManager.initJob(fjob1);
-
+
// check if the jobs are missing from the waiting queue
// The jobs are not removed from waiting queue until they are scheduled
- assertEquals("Waiting queue is garbled on job init", 2,
- scheduler.jobQueuesManager.getWaitingJobs("default")
- .size());
-
+ assertEquals(
+ "Waiting queue is garbled on job init", 2,
+ scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+ .size());
+
// test if changing the job priority/start-time works as expected in the
// running queue
testJobOrderChange(fjob1, fjob2, false);
-
+
// schedule a task
List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
-
+
// complete the job
- taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(),
- fjob1);
-
+ taskTrackerManager.finishTask(
+ tasks.get(0).getTaskID().toString(),
+ fjob1);
+
// mark the job as complete
taskTrackerManager.finalizeJob(fjob1);
-
- Collection<JobInProgress> rqueue =
- scheduler.jobQueuesManager.getRunningJobQueue("default");
-
+
+ Collection<JobInProgress> rqueue =
+ scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
+
// check if the job is removed from the scheduler
- assertFalse("Scheduler contains completed job",
- rqueue.contains(fjob1));
-
+ assertFalse(
+ "Scheduler contains completed job",
+ rqueue.contains(fjob1));
+
// check if the running queue size is correct
- assertEquals("Job finish garbles the queue",
- 1, rqueue.size());
+ assertEquals(
+ "Job finish garbles the queue",
+ 1, rqueue.size());
}
-
+
// test if the queue reflects the changes
- private void testJobOrderChange(FakeJobInProgress fjob1,
- FakeJobInProgress fjob2,
- boolean waiting) {
+ private void testJobOrderChange(
+ FakeJobInProgress fjob1,
+ FakeJobInProgress fjob2,
+ boolean waiting) {
String queueName = waiting ? "waiting" : "running";
-
+
// check if the jobs in the queue are the right order
JobInProgress[] jobs = getJobsInQueue(waiting);
- assertTrue(queueName + " queue doesnt contain job #1 in right order",
- jobs[0].getJobID().equals(fjob1.getJobID()));
- assertTrue(queueName + " queue doesnt contain job #2 in right order",
- jobs[1].getJobID().equals(fjob2.getJobID()));
-
+ assertTrue(
+ queueName + " queue doesnt contain job #1 in right order",
+ jobs[0].getJobID().equals(fjob1.getJobID()));
+ assertTrue(
+ queueName + " queue doesnt contain job #2 in right order",
+ jobs[1].getJobID().equals(fjob2.getJobID()));
+
// I. Check the start-time change
// Change job2 start-time and check if job2 bumps up in the queue
taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
-
+
jobs = getJobsInQueue(waiting);
- assertTrue("Start time change didnt not work as expected for job #2 in "
- + queueName + " queue",
- jobs[0].getJobID().equals(fjob2.getJobID()));
- assertTrue("Start time change didnt not work as expected for job #1 in"
- + queueName + " queue",
- jobs[1].getJobID().equals(fjob1.getJobID()));
-
+ assertTrue(
+ "Start time change didnt not work as expected for job #2 in "
+ + queueName + " queue",
+ jobs[0].getJobID().equals(fjob2.getJobID()));
+ assertTrue(
+ "Start time change didnt not work as expected for job #1 in"
+ + queueName + " queue",
+ jobs[1].getJobID().equals(fjob1.getJobID()));
+
// check if the queue is fine
- assertEquals("Start-time change garbled the " + queueName + " queue",
- 2, jobs.length);
-
+ assertEquals(
+ "Start-time change garbled the " + queueName + " queue",
+ 2, jobs.length);
+
// II. Change job priority change
// Bump up job1's priority and make sure job1 bumps up in the queue
taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
-
+
// Check if the priority changes are reflected
jobs = getJobsInQueue(waiting);
- assertTrue("Priority change didnt not work as expected for job #1 in "
- + queueName + " queue",
- jobs[0].getJobID().equals(fjob1.getJobID()));
- assertTrue("Priority change didnt not work as expected for job #2 in "
- + queueName + " queue",
- jobs[1].getJobID().equals(fjob2.getJobID()));
-
+ assertTrue(
+ "Priority change didnt not work as expected for job #1 in "
+ + queueName + " queue",
+ jobs[0].getJobID().equals(fjob1.getJobID()));
+ assertTrue(
+ "Priority change didnt not work as expected for job #2 in "
+ + queueName + " queue",
+ jobs[1].getJobID().equals(fjob2.getJobID()));
+
// check if the queue is fine
- assertEquals("Priority change has garbled the " + queueName + " queue",
- 2, jobs.length);
-
+ assertEquals(
+ "Priority change has garbled the " + queueName + " queue",
+ 2, jobs.length);
+
// reset the queue state back to normal
taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
}
-
+
private JobInProgress[] getJobsInQueue(boolean waiting) {
- Collection<JobInProgress> queue =
- waiting
- ? scheduler.jobQueuesManager.getWaitingJobs("default")
- : scheduler.jobQueuesManager.getRunningJobQueue("default");
+ Collection<JobInProgress> queue =
+ waiting
+ ? scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+ : scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
return queue.toArray(new JobInProgress[0]);
}
-
+
// tests if tasks can be assinged when there are multiple jobs from a same
// user
public void testJobFinished() throws Exception {
- taskTrackerManager.addQueues(new String[] {"default"});
-
+ taskTrackerManager.addQueues(new String[]{"default"});
+
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit 2 jobs
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
-
+ FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 3, 0, "default", "u1");
+ FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 3, 0, "default", "u1");
+
// I. Check multiple assignments with running tasks within job
// ask for a task from first job
- Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ Task t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000001_0 on tt1");
// ask for another task from the first job
- t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
+
// complete tasks
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
-
+ taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
+ taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
+
// II. Check multiple assignments with running tasks across jobs
// ask for a task from first job
- t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000003_0 on tt1");
+
// ask for a task from the second job
- t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
+
// complete tasks
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
-
+ taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
+ taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
+
// III. Check multiple assignments with completed tasks across jobs
// ask for a task from the second job
- t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000002_0 on tt1");
+
// complete task
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
-
+ taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j2);
+
// IV. Check assignment with completed job
// finish first job
- scheduler.jobCompleted(j1);
-
+ scheduler.jobQueuesManager.getJobQueue(j1).jobCompleted(j1);
+
// ask for another task from the second job
// if tasks can be assigned then the structures are properly updated
- t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
-
+ t = checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000003_0 on tt1");
+
// complete task
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
+ taskTrackerManager.finishTask("attempt_test_0002_m_000003_0", j2);
}
-
- // basic tests, should be able to submit to queues
- public void testSubmitToQueues() throws Exception {
- // set up some queues
- String[] qs = {"default", "q2"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
- // submit a job with no queue specified. It should be accepted
- // and given to the default queue.
- JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-
- // when we ask for a task, we should get one, from the job submitted
- Task t;
- t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // submit another job, to a different queue
- j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // now when we get a task, it should be from the second job
- t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
- }
-
- public void testGetJobs() throws Exception {
- // need only one queue
- String[] qs = { "default" };
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
- HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
- submitJobs(1, 4, "default");
-
+ /**
+ * tests the submission of jobs to container and job queues
+ * @throws Exception
+ */
+ public void testJobSubmission() throws Exception {
+ JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
+
+ queues[0].getProperties().setProperty(
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+ queues[1].getProperties().setProperty(
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ queues[2].getProperties().setProperty(
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+
+ // write the configuration file
+ QueueManagerTestUtils.writeQueueConfigurationFile(
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ setUp(1, 4, 4);
+ // use the queues from the config file.
+ taskTrackerManager.setQueueManager(new QueueManager());
+ scheduler.start();
+
+ // submit a job to the container queue
+ try {
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0,
+ queues[0].getQueueName(), "user");
+ fail("Jobs are being able to be submitted to the container queue");
+ } catch (Exception e) {
+ assertTrue(scheduler.getJobs(queues[0].getQueueName()).isEmpty());
+ }
+
+ FakeJobInProgress job = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
+ 1, 0, queues[1].getQueueName(), "user");
+ assertEquals(1, scheduler.getJobs(queues[1].getQueueName()).size());
+ assertTrue(scheduler.getJobs(queues[1].getQueueName()).contains(job));
+
+ // check if the job is submitted
+ checkAssignment(taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
+
+ // test for getJobs
+ HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
+ taskTrackerManager.submitJobs(1, 4, queues[2].getQueueName());
+
JobQueuesManager mgr = scheduler.jobQueuesManager;
-
- while(mgr.getWaitingJobs("default").size() < 4){
- Thread.sleep(1);
- }
//Raise status change events for jobs submitted.
- raiseStatusChangeEvents(mgr);
- Collection<JobInProgress> jobs = scheduler.getJobs("default");
-
- assertTrue("Number of jobs returned by scheduler is wrong"
- ,jobs.size() == 4);
-
- assertTrue("Submitted jobs and Returned jobs are not same",
- subJobsList.get("u1").containsAll(jobs));
+ raiseStatusChangeEvents(mgr, queues[2].getQueueName());
+ Collection<JobInProgress> jobs =
+ scheduler.getJobs(queues[2].getQueueName());
+
+ assertTrue(
+ "Number of jobs returned by scheduler is wrong"
+ , jobs.size() == 4);
+
+ assertTrue(
+ "Submitted jobs and Returned jobs are not same",
+ subJobsList.get("u1").containsAll(jobs));
}
-
+
//Basic test to test capacity allocation across the queues which have no
//capacity configured.
-
+
public void testCapacityAllocationToQueues() throws Exception {
- String[] qs = {"default","q1","q2","q3","q4"};
+ String[] qs = {"default", "qAZ1", "qAZ2", "qAZ3", "qAZ4"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ1", -1.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ2", -1.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ3", -1.0f, true, 25));
+ queues.add(new FakeQueueInfo("qAZ4", -1.0f, true, 25));
+
+
+ taskTrackerManager.setFakeQueues(queues);
+ scheduler.start();
+ JobQueuesManager jqm = scheduler.jobQueuesManager;
+ assertEquals(18.75f, jqm.getJobQueue("qAZ1").qsc.getCapacityPercent());
+ assertEquals(18.75f, jqm.getJobQueue("qAZ2").qsc.getCapacityPercent());
+ assertEquals(18.75f, jqm.getJobQueue("qAZ3").qsc.getCapacityPercent());
+ assertEquals(18.75f, jqm.getJobQueue("qAZ4").qsc.getCapacityPercent());
+ }
+
+ public void testCapacityAllocFailureWithLowerMaxCapacity() throws Exception {
+ String[] qs = {"default", "qAZ1"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default",25.0f,true,25));
- queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
- assertEquals(18.75f, resConf.getCapacity("q1"));
- assertEquals(18.75f, resConf.getCapacity("q2"));
- assertEquals(18.75f, resConf.getCapacity("q3"));
- assertEquals(18.75f, resConf.getCapacity("q4"));
+ queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+ FakeQueueInfo qi = new FakeQueueInfo("qAZ1", -1.0f, true, 25);
+ qi.maxCapacity = 40.0f;
+ queues.add(qi);
+ taskTrackerManager.setFakeQueues(queues);
+ try {
+ scheduler.start();
+ fail("scheduler start should fail ");
+ }catch(IOException ise) {
+ Throwable e = ise.getCause();
+ assertTrue(e instanceof IllegalStateException);
+ assertEquals(
+ e.getMessage(),
+ " Capacity share (" + 75.0f + ")for unconfigured queue " + "qAZ1" +
+ " is greater than its maximum-capacity percentage " + 40.0f);
+ }
}
// Tests how capacity is computed and assignment of tasks done
@@ -1193,50 +462,51 @@
// the cluster capacity increase slowly.
queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
-
+
// submit a job to the default queue
- submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
-
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
+
// submit a job to the second queue
- submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
-
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
+
// job from q2 runs first because it has some non-zero capacity.
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- verifyCapacity("0", "default");
- verifyCapacity("3", "q2");
-
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
+ verifyCapacity(taskTrackerManager, "0", "default");
+ verifyCapacity(taskTrackerManager, "3", "q2");
+
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt3");
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
- verifyCapacity("0", "default");
- verifyCapacity("5", "q2");
-
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000002_0 on tt2");
+ verifyCapacity(taskTrackerManager, "0", "default");
+ verifyCapacity(taskTrackerManager, "5", "q2");
+
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt4");
- checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
- verifyCapacity("0", "default");
- verifyCapacity("7", "q2");
-
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt3",
+ "attempt_test_0002_m_000003_0 on tt3");
+ verifyCapacity(taskTrackerManager, "0", "default");
+ verifyCapacity(taskTrackerManager, "7", "q2");
+
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt5");
// now job from default should run, as it is furthest away
// in terms of runningMaps / capacity.
- checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
- verifyCapacity("1", "default");
- verifyCapacity("9", "q2");
- }
-
- private void verifyCapacity(String expectedCapacity,
- String queue) throws IOException {
- String schedInfo = taskTrackerManager.getQueueManager().
- getSchedulerInfo(queue).toString();
- assertTrue(schedInfo.contains("Map tasks\nCapacity: "
- + expectedCapacity + " slots"));
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0001_m_000001_0 on tt4");
+ verifyCapacity(taskTrackerManager, "1", "default");
+ verifyCapacity(taskTrackerManager, "9", "q2");
}
-
+
// test capacity transfer
public void testCapacityTransfer() throws Exception {
// set up some queues
@@ -1245,159 +515,202 @@
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
- submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // I should get another map task.
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ // we should get a task
+ Map<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+ // I should get another map task.
+ //No redduces as there is 1 slot only for reduce on TT
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
+
// Now we're at full capacity for maps. If I ask for another map task,
- // I should get a map task from the default queue's capacity.
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ // I should get a map task from the default queue's capacity.
+ //same with reduce
+ expectedStrings.put(MAP,"attempt_test_0001_m_000003_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
// and another
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000004_0 on tt2");
}
/**
- * Creates a queue with max task limit of 2
- * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
- * given to high ram job and are reserved , no other tasks are accepted .
- *
+ * test the high memory blocking with max capacity.
* @throws IOException
*/
- public void testHighMemoryBlockingWithMaxLimit()
- throws IOException {
-
- // 2 map and 1 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+ public void testHighMemoryBlockingWithMaxCapacity()
+ throws IOException {
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
- taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+ taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
- resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("defaultXYZ",2);
+ queues.add(new FakeQueueInfo("defaultXYZM", 25.0f, true, 50));
+
+
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
- scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
- 2 * 1024);
- scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
- scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
- 1 * 1024);
- scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
- scheduler.setResourceManagerConf(resConf);
+ scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+ scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
+ scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+ scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .setMaxCapacityPercent(50);
- // The situation : Submit 2 jobs with high memory map task
- //Set the max limit for queue to 2 ,
- // try submitting more map tasks to the queue , it should not happen
-
- LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
- + "2 map tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
- jConf.setMemoryForReduceTask(0);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(0);
- jConf.setQueueName("defaultXYZ");
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
- FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
- + "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(2);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(2);
- jConf.setQueueName("defaultXYZ");
+ jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
- FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // first, a map from j1 will run this is a high memory job so it would
- // occupy the 2 slots
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. The second job's reduce should be scheduled.
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+ //high ram map from job 1 and normal reduce task from job 1
+ HashMap<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+ List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
+
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 1, 100.0f,0,2);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
- //at this point , the scheduler tries to schedule another map from j2 for
- //another task tracker.
- // This should not happen as all the map slots are taken
- //by the first task itself.hence reduce task from the second job is given
+ //we have reached the maximum limit for map, so no more map tasks.
+ //we have used 1 reduce already and 1 more reduce slot is left for the
+ //before we reach maxcapacity for reduces.
+ // But current 1 slot + 2 slots for high ram reduce would
+ //mean we are crossing the maxium capacity.hence nothing would be assigned
+ //in this call
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+
+ //complete the high ram job on tt1.
+ for (Task task : tasks) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ job1);
+ }
+
+ expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
+
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 2, 200.0f,0,2);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+
+ //complete the high ram job on tt1.
+ for (Task task : tasks) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ job2);
+ }
- checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+
+ expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000002_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
}
/**
- * test if user limits automatically adjust to max map or reduce limit
+ * test if user limits automatically adjust to max map or reduce limit
*/
- public void testUserLimitsWithMaxLimits() throws Exception {
- setUp(4, 4, 4);
+ public void testUserLimitsWithMaxCapacity() throws Exception {
+ setUp(2, 2, 2);
// set up some queues
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
- resConf.setFakeQueues(queues);
- resConf.setMaxMapCap("default", 2);
- resConf.setMaxReduceCap("default", 2);
- scheduler.setResourceManagerConf(resConf);
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .setMaxCapacityPercent(75);
// submit a job
FakeJobInProgress fjob1 =
- submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
FakeJobInProgress fjob2 =
- submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
- // for queue 'default', the capacity for maps is 2.
- // But the max map limit is 2
- // hence user should be getting not more than 1 as it is the 50%.
- Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
- //Now we should get the task from the other job. As the
- //first user has reached his max map limit.
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-
- //Now we are done with map limit , now if we ask for task we should
- // get reduce from 1st job
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
- // Now we're at full capacity for maps. 1 done with reduces for job 1 so
- // now we should get 1 reduces for job 2
- Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
+ // for queue 'default', maxCapacity for map and reduce is 3.
+ // initial user limit for 50% assuming there are 2 users/queue is.
+ // 1 map and 1 reduce.
+ // after max capacity it is 1.5 each.
- taskTrackerManager.finishTask(
- "tt1", t1.getTaskID().toString(),
- fjob1);
+ //first job would be given 1 job each.
+ HashMap<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
- //tt1 completed the task so we have 1 map slot for u1
- // we are assigning the 2nd map task from fjob1
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
- taskTrackerManager.finishTask(
- "tt4", t4.getTaskID().toString(),
- fjob2);
- //tt4 completed the task , so we have 1 reduce slot for u2
- //we are assigning the 2nd reduce from fjob2
- checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
+ //for user u1 we have reached the limit. that is 1 job.
+ //1 more map and reduce tasks.
+ expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
+
+ expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
+
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ }
+
+ // Utility method to construct a map of expected strings
+ // with exactly one map task and one reduce task.
+ private void populateExpectedStrings(Map<String, String> expectedTaskStrings,
+ String mapTask, String reduceTask) {
+ expectedTaskStrings.clear();
+ expectedTaskStrings.put(CapacityTestUtils.MAP, mapTask);
+ expectedTaskStrings.put(CapacityTestUtils.REDUCE, reduceTask);
}
@@ -1409,24 +722,38 @@
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
- submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+ // for queue 'q2', the capacity is 2 for maps and 1 for reduce.
+ // Since we're the only user, we should get tasks
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// Submit another job, from a different user
- submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // Now if I ask for a map task, it should come from the second job
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- // Now we're at full capacity for maps. If I ask for another map task,
- // I should get a map task from the default queue's capacity.
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+ // Now if I ask for a task, it should come from the second job
+ checkAssignment(taskTrackerManager, scheduler,
+ "tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+ // Now we're at full capacity. If I ask for another task,
+ // I should get tasks from the default queue's capacity.
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
// and another
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignment(taskTrackerManager, scheduler,
+ "tt2", "attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
@@ -1437,23 +764,39 @@
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
- submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+ // for queue 'q2', the capacity for maps is 2 and reduce is 1.
+ // Since we're the only user, we should get tasks
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// since we're the only job, we get another map
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
+
// Submit another job, from a different user
- submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // Now if I ask for a map task, it should come from the second job
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+ // Now if I ask for a task, it should come from the second job
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
// and another
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
@@ -1465,46 +808,79 @@
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// submit a job
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+ // for queue 'q2', the capacity for maps is 2 and reduces is 1.
+ // Since we're the only user, we should get a task
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
// since we're the only job, we get another map
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- // we get two more maps from 'default queue'
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0001_m_000002_0 on tt1");
+ // we get more tasks from 'default queue'
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000003_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000004_0 on tt2");
+
// Submit another job, from a different user
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // one of the task finishes
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
- // Now if I ask for a map task, it should come from the second job
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+ // one of the task finishes of each type
+ taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
+ taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", j1);
+
+ // Now if I ask for a task, it should come from the second job
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// another task from job1 finishes, another new task to job2
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
- checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000002_0 on tt1");
+
// now we have equal number of tasks from each job. Whichever job's
// task finishes, that job gets a new task
- taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
- checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
- checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+ taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
+ taskTrackerManager.finishTask("attempt_test_0001_r_000002_0", j1);
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000005_0 on tt2",
+ "attempt_test_0001_r_000003_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
+ taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000003_0 on tt1");
}
// test user limits with many users, more slots
public void testUserLimits4() throws Exception {
- // set up one queue, with 10 slots
+ // set up one queue, with 10 map slots and 5 reduce slots
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// add some more TTs
taskTrackerManager.addTaskTracker("tt3");
@@ -1512,74 +888,93 @@
taskTrackerManager.addTaskTracker("tt5");
// u1 submits job
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// it gets the first 5 slots
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
- checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ for (int i=0; i<5; i++) {
+ String ttName = "tt"+(i+1);
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_00000"+(i+1)+"_0 on " + ttName,
+ "attempt_test_0001_r_00000"+(i+1)+"_0 on " + ttName);
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ ttName, expectedTaskStrings);
+ }
+
// u2 submits job with 4 slots
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
+ FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
- checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
- checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
- checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
- checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
+ for (int i=0; i<4; i++) {
+ String ttName = "tt"+(i+1);
+ checkAssignment(taskTrackerManager, scheduler, ttName,
+ "attempt_test_0002_m_00000"+(i+1)+"_0 on " + ttName);
+ }
// last slot should go to u1, since u2 has no more tasks
- checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
- // u1 finishes a task
- taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt5",
+ "attempt_test_0001_m_000006_0 on tt5");
+ // u1 finishes tasks
+ taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1);
+ taskTrackerManager.finishTask("attempt_test_0001_r_000005_0", j1);
// u1 submits a few more jobs
// All the jobs are inited when submitted
// because of addition of Eager Job Initializer all jobs in this
//case would e initialised.
- submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// u2 also submits a job
- submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
// now u3 submits a job
- submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
- // next slot should go to u3, even though u2 has an earlier job, since
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
+ // next map slot should go to u3, even though u2 has an earlier job, since
// user limits have changed and u1/u2 are over limits
- checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+ // reduce slot will go to job 2, as it is still under limit.
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0007_m_000001_0 on tt5",
+ "attempt_test_0002_r_000001_0 on tt5");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt5", expectedTaskStrings);
// some other task finishes and u3 gets it
- taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
- checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+ taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1);
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0007_m_000002_0 on tt4");
// now, u2 finishes a task
- taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
+ taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1);
// next slot will go to u1, since u3 has nothing to run and u1's job is
// first in the queue
- checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
+ checkAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000007_0 on tt2");
}
/**
* Test to verify that high memory jobs hit user limits faster than any normal
* job.
- *
+ *
* @throws IOException
*/
public void testUserLimitsForHighMemoryJobs()
- throws IOException {
+ throws IOException {
taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
scheduler.setTaskTrackerManager(taskTrackerManager);
- String[] qs = { "default" };
+ String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
- resConf.setFakeQueues(queues);
+
+
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+ JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+ JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
- scheduler.setResourceManagerConf(resConf);
+ MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
// Submit one normal job to the other queue.
@@ -1590,9 +985,11 @@
jConf.setNumReduceTasks(6);
jConf.setUser("u1");
jConf.setQueueName("default");
- FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+ LOG.debug(
+ "Submit one high memory(2GB maps, 2GB reduces) job of "
+ "6 map and 6 reduce tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
@@ -1601,32 +998,41 @@
jConf.setNumReduceTasks(6);
jConf.setQueueName("default");
jConf.setUser("u2");
- FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // Verify that normal job takes 3 task assignments to hit user limits
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+ // Verify that normal job takes 5 task assignments to hit user limits
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ for (int i = 0; i < 5; i++) {
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
+ "attempt_test_0001_m_00000" + (i + 1) + "_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_00000" + (i + 1) + "_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ }
// u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
// are hit. So u2 should get slots
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
-
- // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+ for (int i = 0; i < 2; i++) {
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
+ "attempt_test_0002_m_00000" + (i + 1) + "_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0002_r_00000" + (i + 1) + "_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ } // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
// slots. Because of high memory tasks, giving u2 another task would
// overflow limits. So, no more tasks should be given to anyone.
assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
}
/*
@@ -1639,9 +1045,7 @@
* - Then run initializationPoller()
* - Check once again the waiting queue, it should be 5 jobs again.
* - Then raise status change events.
- * - Assign one task to a task tracker. (Map)
- * - Check waiting job count, it should be 4 now and used map (%) = 100
- * - Assign another one task (Reduce)
+ * - Assign tasks to a task tracker.
* - Check waiting job count, it should be 4 now and used map (%) = 100
* and used reduce (%) = 100
* - finish the job and then check the used percentage it should go
@@ -1654,9 +1058,9 @@
* - Check the count, the waiting job count should be 2.
* - Now raise status change events to move the initialized jobs which
* should be two in count to running queue.
- * - Then schedule a map of the job in running queue.
+ * - Then schedule a map and reduce of the job in running queue.
* - Run the poller because the poller is responsible for waiting
- * jobs count. Check the count, it should be using 100% map and one
+ * jobs count. Check the count, it should be using 100% map, reduce and one
* waiting job
* - fail the running job.
* - Check the count, it should be now one waiting job and zero running
@@ -1671,20 +1075,22 @@
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
+
+
+ taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.assignTasks(tracker("tt1")); // heartbeat
scheduler.assignTasks(tracker("tt2")); // heartbeat
[... 2198 lines stripped ...]